diff --git a/Cargo.lock b/Cargo.lock index c4f80f63c9..d9ac167042 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,9 +10,9 @@ checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" [[package]] name = "addr2line" -version = "0.21.0" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" dependencies = [ "gimli", ] @@ -23,6 +23,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "ahash" version = "0.8.11" @@ -871,17 +877,17 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.69" +version = "0.3.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" dependencies = [ "addr2line", - "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.8.0", "object", "rustc-demangle", + "windows-targets 0.52.6", ] [[package]] @@ -1127,7 +1133,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.4", + "windows-targets 0.52.6", ] [[package]] @@ -2107,7 +2113,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.7.1", ] [[package]] @@ -2308,9 +2314,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.28.1" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "git-version" @@ -3404,6 +3410,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + [[package]] name = "mio" version = "0.8.11" @@ -3638,9 +3653,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.2" +version = "0.36.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" dependencies = [ "memchr", ] @@ -4401,11 +4416,13 @@ dependencies = [ "bindgen", "bytes", "crc32c", + "criterion", "env_logger", "log", "memoffset 0.9.0", "once_cell", "postgres", + "pprof", "regex", "serde", "thiserror", @@ -5062,6 +5079,7 @@ dependencies = [ "once_cell", "pin-project-lite", "rand 0.8.5", + "reqwest", "scopeguard", "serde", "serde_json", @@ -5320,9 +5338,9 @@ dependencies = [ [[package]] name = "rustc-demangle" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc-hash" @@ -5535,6 +5553,7 @@ dependencies = [ "remote_storage", "reqwest", "safekeeper_api", + "safekeeper_client", "scopeguard", "sd-notify", "serde", @@ -5572,6 +5591,18 @@ dependencies = [ "utils", ] +[[package]] +name = "safekeeper_client" +version = "0.1.0" +dependencies = [ + "reqwest", + "safekeeper_api", + "serde", + "thiserror", + "utils", + "workspace_hack", +] + [[package]] name = "same-file" version = "1.0.6" @@ -7203,6 +7234,7 @@ dependencies = [ "anyhow", "arc-swap", "async-compression", + "backtrace", "bincode", "byteorder", "bytes", @@ -7213,12 +7245,14 @@ dependencies = [ "criterion", "diatomic-waker", "fail", + "flate2", "futures", "git-version", "hex", "hex-literal", "humantime", "hyper 0.14.30", + "itertools 0.10.5", "jemalloc_pprof", "jsonwebtoken", "metrics", @@ -7575,7 +7609,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core", - "windows-targets 0.52.4", + "windows-targets 0.52.6", ] [[package]] @@ -7584,7 +7618,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.6", ] [[package]] @@ -7602,7 +7636,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.6", ] [[package]] @@ -7622,17 +7656,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -7643,9 +7678,9 @@ checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -7655,9 +7690,9 @@ checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" [[package]] name = "windows_aarch64_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -7667,9 +7702,15 @@ checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" [[package]] name = "windows_i686_gnu" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -7679,9 +7720,9 @@ checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" [[package]] name = "windows_i686_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -7691,9 +7732,9 @@ checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" [[package]] name = "windows_x86_64_gnu" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -7703,9 +7744,9 @@ checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -7715,9 +7756,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "windows_x86_64_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" diff --git a/Cargo.toml b/Cargo.toml index 0654c25a3d..885f02ba81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "pageserver/pagebench", "proxy", "safekeeper", + "safekeeper/client", "storage_broker", "storage_controller", "storage_controller/client", @@ -51,6 +52,7 @@ anyhow = { version = "1.0", features = ["backtrace"] } arc-swap = "1.6" async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] } atomic-take = "1.1.0" +backtrace = "0.3.74" flate2 = "1.0.26" async-stream = "0.3" async-trait = "0.1" @@ -233,6 +235,7 @@ postgres_initdb = { path = "./libs/postgres_initdb" } pq_proto = { version = "0.1", path = "./libs/pq_proto/" } remote_storage = { version = "0.1", path = "./libs/remote_storage/" } safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" } +safekeeper_client = { path = "./safekeeper/client" } desim = { version = "0.1", path = "./libs/desim" } storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy. storage_controller_client = { path = "./storage_controller/client" } diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index 33d2a10285..9f1f3b7343 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -35,10 +35,12 @@ RUN case $DEBIAN_VERSION in \ ;; \ esac && \ apt update && \ - apt install --no-install-recommends -y git autoconf automake libtool build-essential bison flex libreadline-dev \ + apt install --no-install-recommends --no-install-suggests -y \ + ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \ zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \ libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd \ - $VERSION_INSTALLS + $VERSION_INSTALLS \ + && apt clean && rm -rf /var/lib/apt/lists/* ######################################################################################### # @@ -113,10 +115,12 @@ ARG DEBIAN_VERSION ARG PG_VERSION COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/ RUN apt update && \ - apt install --no-install-recommends -y gdal-bin libboost-dev libboost-thread-dev libboost-filesystem-dev \ + apt install --no-install-recommends --no-install-suggests -y \ + gdal-bin libboost-dev libboost-thread-dev libboost-filesystem-dev \ libboost-system-dev libboost-iostreams-dev libboost-program-options-dev libboost-timer-dev \ libcgal-dev libgdal-dev libgmp-dev libmpfr-dev libopenscenegraph-dev libprotobuf-c-dev \ - protobuf-c-compiler xsltproc + protobuf-c-compiler xsltproc \ + && apt clean && rm -rf /var/lib/apt/lists/* # Postgis 3.5.0 requires SFCGAL 1.4+ @@ -143,9 +147,9 @@ RUN case "${DEBIAN_VERSION}" in \ wget https://gitlab.com/sfcgal/SFCGAL/-/archive/v${SFCGAL_VERSION}/SFCGAL-v${SFCGAL_VERSION}.tar.gz -O SFCGAL.tar.gz && \ echo "${SFCGAL_CHECKSUM} SFCGAL.tar.gz" | sha256sum --check && \ mkdir sfcgal-src && cd sfcgal-src && tar xzf ../SFCGAL.tar.gz --strip-components=1 -C . && \ - cmake -DCMAKE_BUILD_TYPE=Release . && make -j $(getconf _NPROCESSORS_ONLN) && \ - DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \ - make clean && cp -R /sfcgal/* / + cmake -DCMAKE_BUILD_TYPE=Release -GNinja . && ninja -j $(getconf _NPROCESSORS_ONLN) && \ + DESTDIR=/sfcgal ninja install -j $(getconf _NPROCESSORS_ONLN) && \ + ninja clean && cp -R /sfcgal/* / ENV PATH="/usr/local/pgsql/bin:$PATH" @@ -213,9 +217,9 @@ RUN case "${PG_VERSION}" in \ echo "${PGROUTING_CHECKSUM} pgrouting.tar.gz" | sha256sum --check && \ mkdir pgrouting-src && cd pgrouting-src && tar xzf ../pgrouting.tar.gz --strip-components=1 -C . && \ mkdir build && cd build && \ - cmake -DCMAKE_BUILD_TYPE=Release .. && \ - make -j $(getconf _NPROCESSORS_ONLN) && \ - make -j $(getconf _NPROCESSORS_ONLN) install && \ + cmake -GNinja -DCMAKE_BUILD_TYPE=Release .. && \ + ninja -j $(getconf _NPROCESSORS_ONLN) && \ + ninja -j $(getconf _NPROCESSORS_ONLN) install && \ echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control && \ find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\ cp /usr/local/pgsql/share/extension/pgrouting.control /extensions/postgis && \ @@ -235,7 +239,9 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/ COPY compute/patches/plv8-3.1.10.patch /plv8-3.1.10.patch RUN apt update && \ - apt install --no-install-recommends -y ninja-build python3-dev libncurses5 binutils clang + apt install --no-install-recommends --no-install-suggests -y \ + ninja-build python3-dev libncurses5 binutils clang \ + && apt clean && rm -rf /var/lib/apt/lists/* # plv8 3.2.3 supports v17 # last release v3.2.3 - Sep 7, 2024 @@ -301,9 +307,10 @@ RUN mkdir -p /h3/usr/ && \ echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \ mkdir h3-src && cd h3-src && tar xzf ../h3.tar.gz --strip-components=1 -C . && \ mkdir build && cd build && \ - cmake .. -DCMAKE_BUILD_TYPE=Release && \ - make -j $(getconf _NPROCESSORS_ONLN) && \ - DESTDIR=/h3 make install && \ + cmake .. -GNinja -DBUILD_BENCHMARKS=0 -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_FUZZERS=0 -DBUILD_FILTERS=0 -DBUILD_GENERATORS=0 -DBUILD_TESTING=0 \ + && ninja -j $(getconf _NPROCESSORS_ONLN) && \ + DESTDIR=/h3 ninja install && \ cp -R /h3/usr / && \ rm -rf build @@ -650,14 +657,15 @@ FROM build-deps AS rdkit-pg-build ARG PG_VERSION COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/ -RUN apt-get update && \ - apt-get install --no-install-recommends -y \ +RUN apt update && \ + apt install --no-install-recommends --no-install-suggests -y \ libboost-iostreams1.74-dev \ libboost-regex1.74-dev \ libboost-serialization1.74-dev \ libboost-system1.74-dev \ libeigen3-dev \ - libboost-all-dev + libboost-all-dev \ + && apt clean && rm -rf /var/lib/apt/lists/* # rdkit Release_2024_09_1 supports v17 # last release Release_2024_09_1 - Sep 27, 2024 @@ -693,6 +701,8 @@ RUN case "${PG_VERSION}" in \ -D RDK_BUILD_MOLINTERCHANGE_SUPPORT=OFF \ -D RDK_BUILD_YAEHMOP_SUPPORT=OFF \ -D RDK_BUILD_STRUCTCHECKER_SUPPORT=OFF \ + -D RDK_TEST_MULTITHREADED=OFF \ + -D RDK_BUILD_CPP_TESTS=OFF \ -D RDK_USE_URF=OFF \ -D RDK_BUILD_PGSQL=ON \ -D RDK_PGSQL_STATIC=ON \ @@ -704,9 +714,10 @@ RUN case "${PG_VERSION}" in \ -D RDK_INSTALL_COMIC_FONTS=OFF \ -D RDK_BUILD_FREETYPE_SUPPORT=OFF \ -D CMAKE_BUILD_TYPE=Release \ + -GNinja \ . && \ - make -j $(getconf _NPROCESSORS_ONLN) && \ - make -j $(getconf _NPROCESSORS_ONLN) install && \ + ninja -j $(getconf _NPROCESSORS_ONLN) && \ + ninja -j $(getconf _NPROCESSORS_ONLN) install && \ echo 'trusted = true' >> /usr/local/pgsql/share/extension/rdkit.control ######################################################################################### @@ -849,8 +860,9 @@ FROM build-deps AS rust-extensions-build ARG PG_VERSION COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/ -RUN apt-get update && \ - apt-get install --no-install-recommends -y curl libclang-dev && \ +RUN apt update && \ + apt install --no-install-recommends --no-install-suggests -y curl libclang-dev && \ + apt clean && rm -rf /var/lib/apt/lists/* && \ useradd -ms /bin/bash nonroot -b /home ENV HOME=/home/nonroot @@ -885,8 +897,9 @@ FROM build-deps AS rust-extensions-build-pgrx12 ARG PG_VERSION COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/ -RUN apt-get update && \ - apt-get install --no-install-recommends -y curl libclang-dev && \ +RUN apt update && \ + apt install --no-install-recommends --no-install-suggests -y curl libclang-dev && \ + apt clean && rm -rf /var/lib/apt/lists/* && \ useradd -ms /bin/bash nonroot -b /home ENV HOME=/home/nonroot @@ -914,18 +927,22 @@ FROM rust-extensions-build-pgrx12 AS pg-onnx-build # cmake 3.26 or higher is required, so installing it using pip (bullseye-backports has cmake 3.25). # Install it using virtual environment, because Python 3.11 (the default version on Debian 12 (Bookworm)) complains otherwise -RUN apt-get update && apt-get install -y python3 python3-pip python3-venv && \ +RUN apt update && apt install --no-install-recommends --no-install-suggests -y \ + python3 python3-pip python3-venv && \ + apt clean && rm -rf /var/lib/apt/lists/* && \ python3 -m venv venv && \ . venv/bin/activate && \ python3 -m pip install cmake==3.30.5 && \ wget https://github.com/microsoft/onnxruntime/archive/refs/tags/v1.18.1.tar.gz -O onnxruntime.tar.gz && \ mkdir onnxruntime-src && cd onnxruntime-src && tar xzf ../onnxruntime.tar.gz --strip-components=1 -C . && \ - ./build.sh --config Release --parallel --skip_submodule_sync --skip_tests --allow_running_as_root + ./build.sh --config Release --parallel --cmake_generator Ninja \ + --skip_submodule_sync --skip_tests --allow_running_as_root FROM pg-onnx-build AS pgrag-pg-build -RUN apt-get install -y protobuf-compiler && \ +RUN apt update && apt install --no-install-recommends --no-install-suggests -y protobuf-compiler \ + && apt clean && rm -rf /var/lib/apt/lists/* && \ wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.0.0.tar.gz -O pgrag.tar.gz && \ echo "2cbe394c1e74fc8bcad9b52d5fbbfb783aef834ca3ce44626cfd770573700bb4 pgrag.tar.gz" | sha256sum --check && \ mkdir pgrag-src && cd pgrag-src && tar xzf ../pgrag.tar.gz --strip-components=1 -C . && \ @@ -1168,6 +1185,25 @@ RUN case "${PG_VERSION}" in \ make BUILD_TYPE=release -j $(getconf _NPROCESSORS_ONLN) install && \ echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_mooncake.control +######################################################################################### +# +# Layer "pg_repack" +# compile pg_repack extension +# +######################################################################################### + +FROM build-deps AS pg-repack-build +ARG PG_VERSION +COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/ + +ENV PATH="/usr/local/pgsql/bin/:$PATH" + +RUN wget https://github.com/reorg/pg_repack/archive/refs/tags/ver_1.5.2.tar.gz -O pg_repack.tar.gz && \ + echo '4516cad42251ed3ad53ff619733004db47d5755acac83f75924cd94d1c4fb681 pg_repack.tar.gz' | sha256sum --check && \ + mkdir pg_repack-src && cd pg_repack-src && tar xzf ../pg_repack.tar.gz --strip-components=1 -C . && \ + make -j $(getconf _NPROCESSORS_ONLN) && \ + make -j $(getconf _NPROCESSORS_ONLN) install + ######################################################################################### # # Layer "neon-pg-ext-build" @@ -1213,6 +1249,7 @@ COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg-partman-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg-mooncake-build /usr/local/pgsql/ /usr/local/pgsql/ +COPY --from=pg-repack-build /usr/local/pgsql/ /usr/local/pgsql/ COPY pgxn/ pgxn/ RUN make -j $(getconf _NPROCESSORS_ONLN) \ @@ -1279,8 +1316,8 @@ COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/fast_ FROM debian:$DEBIAN_FLAVOR AS pgbouncer RUN set -e \ - && apt-get update \ - && apt-get install --no-install-recommends -y \ + && apt update \ + && apt install --no-install-suggests --no-install-recommends -y \ build-essential \ git \ ca-certificates \ @@ -1288,7 +1325,8 @@ RUN set -e \ automake \ libevent-dev \ libtool \ - pkg-config + pkg-config \ + && apt clean && rm -rf /var/lib/apt/lists/* # Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc) ENV PGBOUNCER_TAG=pgbouncer_1_22_1 @@ -1519,7 +1557,7 @@ RUN apt update && \ procps \ ca-certificates \ $VERSION_INSTALLS && \ - rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ + apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8 # s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2 diff --git a/compute/etc/neon_collector.jsonnet b/compute/etc/neon_collector.jsonnet index aa6cc1cfc8..f8f4cab63b 100644 --- a/compute/etc/neon_collector.jsonnet +++ b/compute/etc/neon_collector.jsonnet @@ -3,7 +3,7 @@ metrics: [ import 'sql_exporter/checkpoints_req.libsonnet', import 'sql_exporter/checkpoints_timed.libsonnet', - import 'sql_exporter/compute_backpressure_throttling_seconds.libsonnet', + import 'sql_exporter/compute_backpressure_throttling_seconds_total.libsonnet', import 'sql_exporter/compute_current_lsn.libsonnet', import 'sql_exporter/compute_logical_snapshot_files.libsonnet', import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet', diff --git a/compute/etc/sql_exporter/compute_backpressure_throttling_seconds.libsonnet b/compute/etc/sql_exporter/compute_backpressure_throttling_seconds_total.libsonnet similarity index 61% rename from compute/etc/sql_exporter/compute_backpressure_throttling_seconds.libsonnet rename to compute/etc/sql_exporter/compute_backpressure_throttling_seconds_total.libsonnet index 02c803cfa6..31725bd179 100644 --- a/compute/etc/sql_exporter/compute_backpressure_throttling_seconds.libsonnet +++ b/compute/etc/sql_exporter/compute_backpressure_throttling_seconds_total.libsonnet @@ -1,10 +1,10 @@ { - metric_name: 'compute_backpressure_throttling_seconds', - type: 'gauge', + metric_name: 'compute_backpressure_throttling_seconds_total', + type: 'counter', help: 'Time compute has spent throttled', key_labels: null, values: [ 'throttled', ], - query: importstr 'sql_exporter/compute_backpressure_throttling_seconds.sql', + query: importstr 'sql_exporter/compute_backpressure_throttling_seconds_total.sql', } diff --git a/compute/etc/sql_exporter/compute_backpressure_throttling_seconds.sql b/compute/etc/sql_exporter/compute_backpressure_throttling_seconds_total.sql similarity index 100% rename from compute/etc/sql_exporter/compute_backpressure_throttling_seconds.sql rename to compute/etc/sql_exporter/compute_backpressure_throttling_seconds_total.sql diff --git a/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json b/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json index 8e582e74e1..0308cab451 100644 --- a/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json +++ b/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json @@ -132,11 +132,6 @@ "name": "cron.database", "value": "postgres", "vartype": "string" - }, - { - "name": "session_preload_libraries", - "value": "anon", - "vartype": "string" } ] }, diff --git a/docker-compose/docker_compose_test.sh b/docker-compose/docker_compose_test.sh index c97dfaa901..063664d0c6 100755 --- a/docker-compose/docker_compose_test.sh +++ b/docker-compose/docker_compose_test.sh @@ -35,11 +35,11 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do echo "clean up containers if exists" cleanup PG_TEST_VERSION=$((pg_version < 16 ? 16 : pg_version)) - # The support of pg_anon not yet added to PG17, so we have to remove the corresponding option - if [ $pg_version -eq 17 ]; then + # The support of pg_anon not yet added to PG17, so we have to add the corresponding option for other PG versions + if [ "${pg_version}" -ne 17 ]; then SPEC_PATH="compute_wrapper/var/db/postgres/specs" mv $SPEC_PATH/spec.json $SPEC_PATH/spec.bak - jq 'del(.cluster.settings[] | select (.name == "session_preload_libraries"))' $SPEC_PATH/spec.bak > $SPEC_PATH/spec.json + jq '.cluster.settings += [{"name": "session_preload_libraries","value": "anon","vartype": "string"}]' "${SPEC_PATH}/spec.bak" > "${SPEC_PATH}/spec.json" fi PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose --profile test-extensions -f $COMPOSE_FILE up --build -d @@ -106,8 +106,8 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do fi fi cleanup - # The support of pg_anon not yet added to PG17, so we have to remove the corresponding option - if [ $pg_version -eq 17 ]; then - mv $SPEC_PATH/spec.bak $SPEC_PATH/spec.json + # Restore the original spec.json + if [ "$pg_version" -ne 17 ]; then + mv "$SPEC_PATH/spec.bak" "$SPEC_PATH/spec.json" fi done diff --git a/libs/desim/src/time.rs b/libs/desim/src/time.rs index 7bb71db95c..7ce605bda8 100644 --- a/libs/desim/src/time.rs +++ b/libs/desim/src/time.rs @@ -91,7 +91,7 @@ impl Timing { /// Return true if there is a ready event. fn is_event_ready(&self, queue: &mut BinaryHeap) -> bool { - queue.peek().map_or(false, |x| x.time <= self.now()) + queue.peek().is_some_and(|x| x.time <= self.now()) } /// Clear all pending events. diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index e1f5443cbe..b7a376841d 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -9,9 +9,11 @@ regex.workspace = true bytes.workspace = true anyhow.workspace = true crc32c.workspace = true +criterion.workspace = true once_cell.workspace = true log.workspace = true memoffset.workspace = true +pprof.workspace = true thiserror.workspace = true serde.workspace = true utils.workspace = true @@ -24,3 +26,7 @@ postgres.workspace = true [build-dependencies] anyhow.workspace = true bindgen.workspace = true + +[[bench]] +name = "waldecoder" +harness = false diff --git a/libs/postgres_ffi/benches/README.md b/libs/postgres_ffi/benches/README.md new file mode 100644 index 0000000000..00a8980174 --- /dev/null +++ b/libs/postgres_ffi/benches/README.md @@ -0,0 +1,26 @@ +## Benchmarks + +To run benchmarks: + +```sh +# All benchmarks. +cargo bench --package postgres_ffi + +# Specific file. +cargo bench --package postgres_ffi --bench waldecoder + +# Specific benchmark. +cargo bench --package postgres_ffi --bench waldecoder complete_record/size=1024 + +# List available benchmarks. +cargo bench --package postgres_ffi --benches -- --list + +# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds. +# Output in target/criterion/*/profile/flamegraph.svg. +cargo bench --package postgres_ffi --bench waldecoder complete_record/size=1024 -- --profile-time 10 +``` + +Additional charts and statistics are available in `target/criterion/report/index.html`. + +Benchmarks are automatically compared against the previous run. To compare against other runs, see +`--baseline` and `--save-baseline`. \ No newline at end of file diff --git a/libs/postgres_ffi/benches/waldecoder.rs b/libs/postgres_ffi/benches/waldecoder.rs new file mode 100644 index 0000000000..c8cf0d322a --- /dev/null +++ b/libs/postgres_ffi/benches/waldecoder.rs @@ -0,0 +1,49 @@ +use std::ffi::CStr; + +use criterion::{criterion_group, criterion_main, Bencher, Criterion}; +use postgres_ffi::v17::wal_generator::LogicalMessageGenerator; +use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler; +use postgres_ffi::waldecoder::WalStreamDecoder; +use pprof::criterion::{Output, PProfProfiler}; +use utils::lsn::Lsn; + +const KB: usize = 1024; + +// Register benchmarks with Criterion. +criterion_group!( + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_complete_record, +); +criterion_main!(benches); + +/// Benchmarks WalStreamDecoder::complete_record() for a logical message of varying size. +fn bench_complete_record(c: &mut Criterion) { + let mut g = c.benchmark_group("complete_record"); + for size in [64, KB, 8 * KB, 128 * KB] { + // Kind of weird to change the group throughput per benchmark, but it's the only way + // to vary it per benchmark. It works. + g.throughput(criterion::Throughput::Bytes(size as u64)); + g.bench_function(format!("size={size}"), |b| run_bench(b, size).unwrap()); + } + + fn run_bench(b: &mut Bencher, size: usize) -> anyhow::Result<()> { + const PREFIX: &CStr = c""; + let value_size = LogicalMessageGenerator::make_value_size(size, PREFIX); + let value = vec![1; value_size]; + + let mut decoder = WalStreamDecoder::new(Lsn(0), 170000); + let msg = LogicalMessageGenerator::new(PREFIX, &value) + .next() + .unwrap() + .encode(Lsn(0)); + assert_eq!(msg.len(), size); + + b.iter(|| { + let msg = msg.clone(); // Bytes::clone() is cheap + decoder.complete_record(msg).unwrap(); + }); + + Ok(()) + } +} diff --git a/libs/postgres_ffi/src/wal_generator.rs b/libs/postgres_ffi/src/wal_generator.rs index dc679eea33..69cc4b771f 100644 --- a/libs/postgres_ffi/src/wal_generator.rs +++ b/libs/postgres_ffi/src/wal_generator.rs @@ -231,6 +231,22 @@ impl LogicalMessageGenerator { }; [&header.encode(), prefix, message].concat().into() } + + /// Computes how large a value must be to get a record of the given size. Convenience method to + /// construct records of pre-determined size. Panics if the record size is too small. + pub fn make_value_size(record_size: usize, prefix: &CStr) -> usize { + let xlog_header_size = XLOG_SIZE_OF_XLOG_RECORD; + let lm_header_size = size_of::(); + let prefix_size = prefix.to_bytes_with_nul().len(); + let data_header_size = match record_size - xlog_header_size - 2 { + 0..=255 => 2, + 256..=258 => panic!("impossible record_size {record_size}"), + 259.. => 5, + }; + record_size + .checked_sub(xlog_header_size + lm_header_size + prefix_size + data_header_size) + .expect("record_size too small") + } } impl Iterator for LogicalMessageGenerator { diff --git a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs index 9eb3f0e95a..4a33dbe25b 100644 --- a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs +++ b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs @@ -81,7 +81,7 @@ fn test_end_of_wal(test_name: &str) { continue; } let mut f = File::options().write(true).open(file.path()).unwrap(); - const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; + static ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; f.write_all( &ZEROS[0..min( WAL_SEGMENT_SIZE, diff --git a/libs/proxy/tokio-postgres2/src/to_statement.rs b/libs/proxy/tokio-postgres2/src/to_statement.rs index 427f77dd79..7e12992728 100644 --- a/libs/proxy/tokio-postgres2/src/to_statement.rs +++ b/libs/proxy/tokio-postgres2/src/to_statement.rs @@ -11,7 +11,7 @@ mod private { Query(&'a str), } - impl<'a> ToStatementType<'a> { + impl ToStatementType<'_> { pub async fn into_statement(self, client: &Client) -> Result { match self { ToStatementType::Statement(s) => Ok(s.clone()), diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 1816825bda..33fa6e89f5 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -18,6 +18,7 @@ camino = { workspace = true, features = ["serde1"] } humantime-serde.workspace = true hyper = { workspace = true, features = ["client"] } futures.workspace = true +reqwest.workspace = true serde.workspace = true serde_json.workspace = true tokio = { workspace = true, features = ["sync", "fs", "io-util"] } diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 32c51bc2ad..c89f50ef2b 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -8,6 +8,7 @@ use std::io; use std::num::NonZeroU32; use std::pin::Pin; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; @@ -15,6 +16,8 @@ use super::REMOTE_STORAGE_PREFIX_SEPARATOR; use anyhow::Context; use anyhow::Result; use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range}; +use azure_core::HttpClient; +use azure_core::TransportOptions; use azure_core::{Continuable, RetryOptions}; use azure_storage::StorageCredentials; use azure_storage_blobs::blob::CopyStatus; @@ -80,8 +83,13 @@ impl AzureBlobStorage { StorageCredentials::token_credential(token_credential) }; - // we have an outer retry - let builder = ClientBuilder::new(account, credentials).retry(RetryOptions::none()); + let builder = ClientBuilder::new(account, credentials) + // we have an outer retry + .retry(RetryOptions::none()) + // Customize transport to configure conneciton pooling + .transport(TransportOptions::new(Self::reqwest_client( + azure_config.conn_pool_size, + ))); let client = builder.container_client(azure_config.container_name.to_owned()); @@ -106,6 +114,14 @@ impl AzureBlobStorage { }) } + fn reqwest_client(conn_pool_size: usize) -> Arc { + let client = reqwest::ClientBuilder::new() + .pool_max_idle_per_host(conn_pool_size) + .build() + .expect("failed to build `reqwest` client"); + Arc::new(client) + } + pub fn relative_path_to_name(&self, path: &RemotePath) -> String { assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR); let path_string = path.get_path().as_str(); @@ -544,9 +560,9 @@ impl RemoteStorage for AzureBlobStorage { .await } - async fn delete_objects<'a>( + async fn delete_objects( &self, - paths: &'a [RemotePath], + paths: &[RemotePath], cancel: &CancellationToken, ) -> anyhow::Result<()> { let kind = RequestKind::Delete; diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index f6ef31077c..dd49d4d5e7 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -114,6 +114,16 @@ fn default_max_keys_per_list_response() -> Option { DEFAULT_MAX_KEYS_PER_LIST_RESPONSE } +fn default_azure_conn_pool_size() -> usize { + // Conservative default: no connection pooling. At time of writing this is the Azure + // SDK's default as well, due to historic reports of hard-to-reproduce issues + // (https://github.com/hyperium/hyper/issues/2312) + // + // However, using connection pooling is important to avoid exhausting client ports when + // doing huge numbers of requests (https://github.com/neondatabase/cloud/issues/20971) + 0 +} + impl Debug for S3Config { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("S3Config") @@ -146,6 +156,8 @@ pub struct AzureConfig { pub concurrency_limit: NonZeroUsize, #[serde(default = "default_max_keys_per_list_response")] pub max_keys_per_list_response: Option, + #[serde(default = "default_azure_conn_pool_size")] + pub conn_pool_size: usize, } fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize { @@ -302,6 +314,7 @@ timeout = '5s'"; container_region = 'westeurope' upload_storage_class = 'INTELLIGENT_TIERING' timeout = '7s' + conn_pool_size = 8 "; let config = parse(toml).unwrap(); @@ -316,6 +329,7 @@ timeout = '5s'"; prefix_in_container: None, concurrency_limit: default_remote_storage_azure_concurrency_limit(), max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, + conn_pool_size: 8, }), timeout: Duration::from_secs(7), small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 2a3468f986..7a864151ec 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -341,9 +341,9 @@ pub trait RemoteStorage: Send + Sync + 'static { /// If the operation fails because of timeout or cancellation, the root cause of the error will be /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went /// through. - async fn delete_objects<'a>( + async fn delete_objects( &self, - paths: &'a [RemotePath], + paths: &[RemotePath], cancel: &CancellationToken, ) -> anyhow::Result<()>; diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 1a2d421c66..a8b00173ba 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -562,9 +562,9 @@ impl RemoteStorage for LocalFs { } } - async fn delete_objects<'a>( + async fn delete_objects( &self, - paths: &'a [RemotePath], + paths: &[RemotePath], cancel: &CancellationToken, ) -> anyhow::Result<()> { for path in paths { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 2891f92d07..d3f19f0b11 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -813,9 +813,9 @@ impl RemoteStorage for S3Bucket { .await } - async fn delete_objects<'a>( + async fn delete_objects( &self, - paths: &'a [RemotePath], + paths: &[RemotePath], cancel: &CancellationToken, ) -> anyhow::Result<()> { let kind = RequestKind::Delete; diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 51833c1fe6..63c24beb51 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -181,9 +181,9 @@ impl RemoteStorage for UnreliableWrapper { self.delete_inner(path, true, cancel).await } - async fn delete_objects<'a>( + async fn delete_objects( &self, - paths: &'a [RemotePath], + paths: &[RemotePath], cancel: &CancellationToken, ) -> anyhow::Result<()> { self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?; diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 92d579fec8..15004dbf83 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -218,6 +218,7 @@ async fn create_azure_client( prefix_in_container: Some(format!("test_{millis}_{random:08x}/")), concurrency_limit: NonZeroUsize::new(100).unwrap(), max_keys_per_list_response, + conn_pool_size: 8, }), timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 66500fb141..02bf77760a 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -15,17 +15,20 @@ arc-swap.workspace = true sentry.workspace = true async-compression.workspace = true anyhow.workspace = true +backtrace.workspace = true bincode.workspace = true bytes.workspace = true camino.workspace = true chrono.workspace = true diatomic-waker.workspace = true +flate2.workspace = true git-version.workspace = true hex = { workspace = true, features = ["serde"] } humantime.workspace = true hyper0 = { workspace = true, features = ["full"] } +itertools.workspace = true fail.workspace = true -futures = { workspace = true} +futures = { workspace = true } jemalloc_pprof.workspace = true jsonwebtoken.workspace = true nix.workspace = true diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index d975b63677..9b37b69939 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -1,15 +1,22 @@ use crate::auth::{AuthError, Claims, SwappableJwtAuth}; use crate::http::error::{api_error_handler, route_error_handler, ApiError}; use crate::http::request::{get_query_param, parse_query_param}; +use crate::pprof; +use ::pprof::protos::Message as _; +use ::pprof::ProfilerGuardBuilder; use anyhow::{anyhow, Context}; +use bytes::{Bytes, BytesMut}; use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION}; use hyper::http::HeaderValue; use hyper::Method; use hyper::{header::CONTENT_TYPE, Body, Request, Response}; use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder}; use once_cell::sync::Lazy; +use regex::Regex; use routerify::ext::RequestExt; use routerify::{Middleware, RequestInfo, Router, RouterBuilder}; +use tokio::sync::{mpsc, Mutex}; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; use tracing::{debug, info, info_span, warn, Instrument}; @@ -18,11 +25,6 @@ use std::io::Write as _; use std::str::FromStr; use std::time::Duration; -use bytes::{Bytes, BytesMut}; -use pprof::protos::Message as _; -use tokio::sync::{mpsc, Mutex}; -use tokio_stream::wrappers::ReceiverStream; - static SERVE_METRICS_COUNT: Lazy = Lazy::new(|| { register_int_counter!( "libmetrics_metric_handler_requests_total", @@ -365,7 +367,7 @@ pub async fn profile_cpu_handler(req: Request) -> Result, A // Take the profile. let report = tokio::task::spawn_blocking(move || { - let guard = pprof::ProfilerGuardBuilder::default() + let guard = ProfilerGuardBuilder::default() .frequency(frequency_hz) .blocklist(&["libc", "libgcc", "pthread", "vdso"]) .build()?; @@ -457,10 +459,34 @@ pub async fn profile_heap_handler(req: Request) -> Result, } Format::Pprof => { - let data = tokio::task::spawn_blocking(move || prof_ctl.dump_pprof()) - .await - .map_err(|join_err| ApiError::InternalServerError(join_err.into()))? - .map_err(ApiError::InternalServerError)?; + let data = tokio::task::spawn_blocking(move || { + let bytes = prof_ctl.dump_pprof()?; + // Symbolize the profile. + // TODO: consider moving this upstream to jemalloc_pprof and avoiding the + // serialization roundtrip. + static STRIP_FUNCTIONS: Lazy> = Lazy::new(|| { + // Functions to strip from profiles. If true, also remove child frames. + vec![ + (Regex::new("^__rust").unwrap(), false), + (Regex::new("^_start$").unwrap(), false), + (Regex::new("^irallocx_prof").unwrap(), true), + (Regex::new("^prof_alloc_prep").unwrap(), true), + (Regex::new("^std::rt::lang_start").unwrap(), false), + (Regex::new("^std::sys::backtrace::__rust").unwrap(), false), + ] + }); + let profile = pprof::decode(&bytes)?; + let profile = pprof::symbolize(profile)?; + let profile = pprof::strip_locations( + profile, + &["libc", "libgcc", "pthread", "vdso"], + &STRIP_FUNCTIONS, + ); + pprof::encode(&profile) + }) + .await + .map_err(|join_err| ApiError::InternalServerError(join_err.into()))? + .map_err(ApiError::InternalServerError)?; Response::builder() .status(200) .header(CONTENT_TYPE, "application/octet-stream") diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index bccd0e0488..2c56dd750f 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -96,6 +96,8 @@ pub mod circuit_breaker; pub mod try_rcu; +pub mod pprof; + // Re-export used in macro. Avoids adding git-version as dep in target crates. #[doc(hidden)] pub use git_version; diff --git a/libs/utils/src/pprof.rs b/libs/utils/src/pprof.rs new file mode 100644 index 0000000000..90910897bf --- /dev/null +++ b/libs/utils/src/pprof.rs @@ -0,0 +1,190 @@ +use flate2::write::{GzDecoder, GzEncoder}; +use flate2::Compression; +use itertools::Itertools as _; +use once_cell::sync::Lazy; +use pprof::protos::{Function, Line, Message as _, Profile}; +use regex::Regex; + +use std::borrow::Cow; +use std::collections::{HashMap, HashSet}; +use std::ffi::c_void; +use std::io::Write as _; + +/// Decodes a gzip-compressed Protobuf-encoded pprof profile. +pub fn decode(bytes: &[u8]) -> anyhow::Result { + let mut gz = GzDecoder::new(Vec::new()); + gz.write_all(bytes)?; + Ok(Profile::parse_from_bytes(&gz.finish()?)?) +} + +/// Encodes a pprof profile as gzip-compressed Protobuf. +pub fn encode(profile: &Profile) -> anyhow::Result> { + let mut gz = GzEncoder::new(Vec::new(), Compression::default()); + profile.write_to_writer(&mut gz)?; + Ok(gz.finish()?) +} + +/// Symbolizes a pprof profile using the current binary. +pub fn symbolize(mut profile: Profile) -> anyhow::Result { + if !profile.function.is_empty() { + return Ok(profile); // already symbolized + } + + // Collect function names. + let mut functions: HashMap = HashMap::new(); + let mut strings: HashMap = profile + .string_table + .into_iter() + .enumerate() + .map(|(i, s)| (s, i as i64)) + .collect(); + + // Helper to look up or register a string. + let mut string_id = |s: &str| -> i64 { + // Don't use .entry() to avoid unnecessary allocations. + if let Some(id) = strings.get(s) { + return *id; + } + let id = strings.len() as i64; + strings.insert(s.to_string(), id); + id + }; + + for loc in &mut profile.location { + if !loc.line.is_empty() { + continue; + } + + // Resolve the line and function for each location. + backtrace::resolve(loc.address as *mut c_void, |symbol| { + let Some(symname) = symbol.name() else { + return; + }; + let mut name = symname.to_string(); + + // Strip the Rust monomorphization suffix from the symbol name. + static SUFFIX_REGEX: Lazy = + Lazy::new(|| Regex::new("::h[0-9a-f]{16}$").expect("invalid regex")); + if let Some(m) = SUFFIX_REGEX.find(&name) { + name.truncate(m.start()); + } + + let function_id = match functions.get(&name) { + Some(function) => function.id, + None => { + let id = functions.len() as u64 + 1; + let system_name = String::from_utf8_lossy(symname.as_bytes()); + let filename = symbol + .filename() + .map(|path| path.to_string_lossy()) + .unwrap_or(Cow::Borrowed("")); + let function = Function { + id, + name: string_id(&name), + system_name: string_id(&system_name), + filename: string_id(&filename), + ..Default::default() + }; + functions.insert(name, function); + id + } + }; + loc.line.push(Line { + function_id, + line: symbol.lineno().unwrap_or(0) as i64, + ..Default::default() + }); + }); + } + + // Store the resolved functions, and mark the mapping as resolved. + profile.function = functions.into_values().sorted_by_key(|f| f.id).collect(); + profile.string_table = strings + .into_iter() + .sorted_by_key(|(_, i)| *i) + .map(|(s, _)| s) + .collect(); + + for mapping in &mut profile.mapping { + mapping.has_functions = true; + mapping.has_filenames = true; + } + + Ok(profile) +} + +/// Strips locations (stack frames) matching the given mappings (substring) or function names +/// (regex). The function bool specifies whether child frames should be stripped as well. +/// +/// The string definitions are left behind in the profile for simplicity, to avoid rewriting all +/// string references. +pub fn strip_locations( + mut profile: Profile, + mappings: &[&str], + functions: &[(Regex, bool)], +) -> Profile { + // Strip mappings. + let mut strip_mappings: HashSet = HashSet::new(); + + profile.mapping.retain(|mapping| { + let Some(name) = profile.string_table.get(mapping.filename as usize) else { + return true; + }; + if mappings.iter().any(|substr| name.contains(substr)) { + strip_mappings.insert(mapping.id); + return false; + } + true + }); + + // Strip functions. + let mut strip_functions: HashMap = HashMap::new(); + + profile.function.retain(|function| { + let Some(name) = profile.string_table.get(function.name as usize) else { + return true; + }; + for (regex, strip_children) in functions { + if regex.is_match(name) { + strip_functions.insert(function.id, *strip_children); + return false; + } + } + true + }); + + // Strip locations. The bool specifies whether child frames should be stripped too. + let mut strip_locations: HashMap = HashMap::new(); + + profile.location.retain(|location| { + for line in &location.line { + if let Some(strip_children) = strip_functions.get(&line.function_id) { + strip_locations.insert(location.id, *strip_children); + return false; + } + } + if strip_mappings.contains(&location.mapping_id) { + strip_locations.insert(location.id, false); + return false; + } + true + }); + + // Strip sample locations. + for sample in &mut profile.sample { + // First, find the uppermost function with child removal and truncate the stack. + if let Some(truncate) = sample + .location_id + .iter() + .rposition(|id| strip_locations.get(id) == Some(&true)) + { + sample.location_id.drain(..=truncate); + } + // Next, strip any individual frames without child removal. + sample + .location_id + .retain(|id| !strip_locations.contains_key(id)); + } + + profile +} diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 20f88868f9..7779ffaf8b 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -272,7 +272,7 @@ struct CompactionJob { completed: bool, } -impl<'a, E> LevelCompactionState<'a, E> +impl LevelCompactionState<'_, E> where E: CompactionJobExecutor, { diff --git a/pageserver/compaction/src/identify_levels.rs b/pageserver/compaction/src/identify_levels.rs index 1853afffdd..e04bd15396 100644 --- a/pageserver/compaction/src/identify_levels.rs +++ b/pageserver/compaction/src/identify_levels.rs @@ -224,9 +224,8 @@ impl Level { } // recalculate depth if this was the last event at this point - let more_events_at_this_key = events_iter - .peek() - .map_or(false, |next_e| next_e.key == e.key); + let more_events_at_this_key = + events_iter.peek().is_some_and(|next_e| next_e.key == e.key); if !more_events_at_this_key { let mut active_depth = 0; for (_end_lsn, is_image, _idx) in active_set.iter().rev() { diff --git a/pageserver/compaction/src/interface.rs b/pageserver/compaction/src/interface.rs index 5bc9b5ca1d..8ed393a645 100644 --- a/pageserver/compaction/src/interface.rs +++ b/pageserver/compaction/src/interface.rs @@ -148,7 +148,7 @@ pub trait CompactionDeltaLayer: CompactionLay Self: 'a; /// Return all keys in this delta layer. - fn load_keys<'a>( + fn load_keys( &self, ctx: &E::RequestContext, ) -> impl Future>>> + Send; diff --git a/pageserver/compaction/src/simulator.rs b/pageserver/compaction/src/simulator.rs index 776c537d03..673b80c313 100644 --- a/pageserver/compaction/src/simulator.rs +++ b/pageserver/compaction/src/simulator.rs @@ -143,7 +143,7 @@ impl interface::CompactionLayer for Arc { impl interface::CompactionDeltaLayer for Arc { type DeltaEntry<'a> = MockRecord; - async fn load_keys<'a>(&self, _ctx: &MockRequestContext) -> anyhow::Result> { + async fn load_keys(&self, _ctx: &MockRequestContext) -> anyhow::Result> { Ok(self.records.clone()) } } diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index cae0ffb980..e1b5676f46 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -248,7 +248,7 @@ where } } -impl<'a, W> Basebackup<'a, W> +impl Basebackup<'_, W> where W: AsyncWrite + Send + Sync + Unpin, { diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 3eaecd3a08..14c7e0d2f8 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1242,7 +1242,7 @@ pub struct DatadirModification<'a> { pending_metadata_bytes: usize, } -impl<'a> DatadirModification<'a> { +impl DatadirModification<'_> { // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed. @@ -1263,7 +1263,7 @@ impl<'a> DatadirModification<'a> { pub(crate) fn has_dirty_data(&self) -> bool { self.pending_data_batch .as_ref() - .map_or(false, |b| b.has_data()) + .is_some_and(|b| b.has_data()) } /// Set the current lsn @@ -2230,7 +2230,7 @@ impl<'a> DatadirModification<'a> { assert!(!self .pending_data_batch .as_ref() - .map_or(false, |b| b.updates_key(&key))); + .is_some_and(|b| b.updates_key(&key))); } } @@ -2299,7 +2299,7 @@ pub enum Version<'a> { Modified(&'a DatadirModification<'a>), } -impl<'a> Version<'a> { +impl Version<'_> { async fn get( &self, timeline: &Timeline, diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index dd70f6bbff..7b55df52a5 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -35,7 +35,7 @@ pub struct CompressionInfo { pub compressed_size: Option, } -impl<'a> BlockCursor<'a> { +impl BlockCursor<'_> { /// Read a blob into a new buffer. pub async fn read_blob( &self, diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 2bd7f2d619..990211f80a 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -89,7 +89,7 @@ pub(crate) enum BlockReaderRef<'a> { VirtualFile(&'a VirtualFile), } -impl<'a> BlockReaderRef<'a> { +impl BlockReaderRef<'_> { #[inline(always)] async fn read_blk( &self, diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index b302cbc975..c77342b144 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -532,7 +532,7 @@ pub struct DiskBtreeIterator<'a> { >, } -impl<'a> DiskBtreeIterator<'a> { +impl DiskBtreeIterator<'_> { pub async fn next(&mut self) -> Option, u64), DiskBtreeError>> { self.stream.next().await } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index aaec8a4c31..ba79672bc7 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -174,11 +174,11 @@ impl EphemeralFile { } impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile { - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( - &'b self, + async fn read_exact_at_eof_ok( + &self, start: u64, dst: tokio_epoll_uring::Slice, - ctx: &'a RequestContext, + ctx: &RequestContext, ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { let submitted_offset = self.buffered_writer.bytes_submitted(); diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 7f15baed10..1b6924425c 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -392,8 +392,8 @@ impl LayerMap { image_layer: Option>, end_lsn: Lsn, ) -> Option { - assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta())); - assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta())); + assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta())); + assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta())); match (delta_layer, image_layer) { (None, None) => None, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 20e0536a00..fee11bc742 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -749,7 +749,7 @@ impl RemoteTimelineClient { // ahead of what's _actually_ on the remote during index upload. upload_queue.dirty.metadata = metadata.clone(); - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); Ok(()) } @@ -770,7 +770,7 @@ impl RemoteTimelineClient { upload_queue.dirty.metadata.apply(update); - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); Ok(()) } @@ -809,7 +809,7 @@ impl RemoteTimelineClient { if let Some(archived_at_set) = need_upload_scheduled { let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc()); upload_queue.dirty.archived_at = intended_archived_at; - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); } let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some(); @@ -824,7 +824,7 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; upload_queue.dirty.import_pgdata = state; - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); Ok(()) } @@ -843,17 +843,14 @@ impl RemoteTimelineClient { let upload_queue = guard.initialized_mut()?; if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 { - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); } Ok(()) } /// Launch an index-file upload operation in the background (internal function) - fn schedule_index_upload( - self: &Arc, - upload_queue: &mut UploadQueueInitialized, - ) -> Result<(), NotInitialized> { + fn schedule_index_upload(self: &Arc, upload_queue: &mut UploadQueueInitialized) { let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn(); // fix up the duplicated field upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn; @@ -880,7 +877,6 @@ impl RemoteTimelineClient { // Launch the task immediately, if possible self.launch_queued_tasks(upload_queue); - Ok(()) } /// Reparent this timeline to a new parent. @@ -909,7 +905,7 @@ impl RemoteTimelineClient { upload_queue.dirty.metadata.reparent(new_parent); upload_queue.dirty.lineage.record_previous_ancestor(&prev); - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); Some(self.schedule_barrier0(upload_queue)) } @@ -948,7 +944,7 @@ impl RemoteTimelineClient { assert!(prev.is_none(), "copied layer existed already {layer}"); } - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); Some(self.schedule_barrier0(upload_queue)) } @@ -1004,7 +1000,7 @@ impl RemoteTimelineClient { upload_queue.dirty.gc_blocking = current .map(|x| x.with_reason(reason)) .or_else(|| Some(index::GcBlocking::started_now_for(reason))); - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); Some(self.schedule_barrier0(upload_queue)) } } @@ -1057,8 +1053,7 @@ impl RemoteTimelineClient { upload_queue.dirty.gc_blocking = current.as_ref().and_then(|x| x.without_reason(reason)); assert!(wanted(upload_queue.dirty.gc_blocking.as_ref())); - // FIXME: bogus ? - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); Some(self.schedule_barrier0(upload_queue)) } } @@ -1125,8 +1120,8 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - let with_metadata = self - .schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned())?; + let with_metadata = + self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned()); self.schedule_deletion_of_unlinked0(upload_queue, with_metadata); @@ -1153,7 +1148,7 @@ impl RemoteTimelineClient { let names = gc_layers.iter().map(|x| x.layer_desc().layer_name()); - self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?; + self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names); self.launch_queued_tasks(upload_queue); @@ -1166,7 +1161,7 @@ impl RemoteTimelineClient { self: &Arc, upload_queue: &mut UploadQueueInitialized, names: I, - ) -> Result, NotInitialized> + ) -> Vec<(LayerName, LayerFileMetadata)> where I: IntoIterator, { @@ -1208,10 +1203,10 @@ impl RemoteTimelineClient { // index_part update, because that needs to be uploaded before we can actually delete the // files. if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 { - self.schedule_index_upload(upload_queue)?; + self.schedule_index_upload(upload_queue); } - Ok(with_metadata) + with_metadata } /// Schedules deletion for layer files which have previously been unlinked from the @@ -1302,7 +1297,7 @@ impl RemoteTimelineClient { let names = compacted_from.iter().map(|x| x.layer_desc().layer_name()); - self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?; + self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names); self.launch_queued_tasks(upload_queue); Ok(()) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index d15f161fb6..b4d45dca75 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -145,8 +145,8 @@ pub async fn download_layer_file<'a>( /// /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked. /// The unlinking has _not_ been made durable. -async fn download_object<'a>( - storage: &'a GenericRemoteStorage, +async fn download_object( + storage: &GenericRemoteStorage, src_path: &RemotePath, dst_path: &Utf8PathBuf, #[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate, diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 0cd5d05aa2..e434d24e5f 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -25,8 +25,8 @@ use utils::id::{TenantId, TimelineId}; use tracing::info; /// Serializes and uploads the given index part data to the remote storage. -pub(crate) async fn upload_index_part<'a>( - storage: &'a GenericRemoteStorage, +pub(crate) async fn upload_index_part( + storage: &GenericRemoteStorage, tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, generation: Generation, diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 9e3a25cbbc..b8206fca5a 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -345,10 +345,7 @@ impl LayerFringe { } pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range)> { - let read_desc = match self.planned_visits_by_lsn.pop() { - Some(desc) => desc, - None => return None, - }; + let read_desc = self.planned_visits_by_lsn.pop()?; let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index fec8a0a16c..ade1b794c6 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1486,7 +1486,7 @@ pub struct ValueRef<'a> { layer: &'a DeltaLayerInner, } -impl<'a> ValueRef<'a> { +impl ValueRef<'_> { /// Loads the value from disk pub async fn load(&self, ctx: &RequestContext) -> Result { let buf = self.load_raw(ctx).await?; @@ -1543,7 +1543,7 @@ pub struct DeltaLayerIterator<'a> { is_end: bool, } -impl<'a> DeltaLayerIterator<'a> { +impl DeltaLayerIterator<'_> { pub(crate) fn layer_dbg_info(&self) -> String { self.delta_layer.layer_dbg_info() } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 834d1931d0..0d3c9d5a44 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -1052,7 +1052,7 @@ pub struct ImageLayerIterator<'a> { is_end: bool, } -impl<'a> ImageLayerIterator<'a> { +impl ImageLayerIterator<'_> { pub(crate) fn layer_dbg_info(&self) -> String { self.image_layer.layer_dbg_info() } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index a4bb3a6bfc..1d86015fab 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -25,11 +25,11 @@ pub trait File: Send { /// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`. /// /// No guarantees are made about the remaining bytes in `dst` in case of a short read. - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( - &'b self, + async fn read_exact_at_eof_ok( + &self, start: u64, dst: Slice, - ctx: &'a RequestContext, + ctx: &RequestContext, ) -> std::io::Result<(Slice, usize)>; } @@ -479,11 +479,11 @@ mod tests { } impl File for InMemoryFile { - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( - &'b self, + async fn read_exact_at_eof_ok( + &self, start: u64, mut dst: Slice, - _ctx: &'a RequestContext, + _ctx: &RequestContext, ) -> std::io::Result<(Slice, usize)> { let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed(); let nread = { @@ -609,12 +609,12 @@ mod tests { } } - impl<'x> File for RecorderFile<'x> { - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( - &'b self, + impl File for RecorderFile<'_> { + async fn read_exact_at_eof_ok( + &self, start: u64, dst: Slice, - ctx: &'a RequestContext, + ctx: &RequestContext, ) -> std::io::Result<(Slice, usize)> { let (dst, nread) = self.file.read_exact_at_eof_ok(start, dst, ctx).await?; self.recorded.borrow_mut().push(RecordedRead { @@ -740,11 +740,11 @@ mod tests { } impl File for MockFile { - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( - &'b self, + async fn read_exact_at_eof_ok( + &self, start: u64, mut dst: Slice, - _ctx: &'a RequestContext, + _ctx: &RequestContext, ) -> std::io::Result<(Slice, usize)> { let ExpectedRead { expect_pos, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0416953c1f..87f5a03382 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5842,7 +5842,7 @@ enum OpenLayerAction { None, } -impl<'a> TimelineWriter<'a> { +impl TimelineWriter<'_> { async fn handle_open_layer_action( &mut self, at: Lsn, diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 5e6290729c..8b6cc8ed84 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1110,7 +1110,7 @@ impl Timeline { return Err(CompactionError::ShuttingDown); } - let same_key = prev_key.map_or(false, |prev_key| prev_key == key); + let same_key = prev_key == Some(key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn { let mut next_key_size = 0u64; @@ -2904,7 +2904,7 @@ impl CompactionLayer for ResidentDeltaLayer { impl CompactionDeltaLayer for ResidentDeltaLayer { type DeltaEntry<'a> = DeltaEntry<'a>; - async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result>> { + async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result>> { self.0.get_as_delta(ctx).await?.index_entries(ctx).await } } diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index 575d60be85..c3de77b352 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -187,7 +187,6 @@ async fn authenticate( NodeInfo { config, aux: db_info.aux, - allow_self_signed_compute: false, // caller may override }, db_info.allowed_ips, )) diff --git a/proxy/src/auth/backend/jwt.rs b/proxy/src/auth/backend/jwt.rs index a258090b15..df716f8455 100644 --- a/proxy/src/auth/backend/jwt.rs +++ b/proxy/src/auth/backend/jwt.rs @@ -776,6 +776,7 @@ impl From<&jose_jwk::Key> for KeyType { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use std::future::IntoFuture; use std::net::SocketAddr; diff --git a/proxy/src/auth/backend/local.rs b/proxy/src/auth/backend/local.rs index d4273fb521..d10f0e82b2 100644 --- a/proxy/src/auth/backend/local.rs +++ b/proxy/src/auth/backend/local.rs @@ -37,7 +37,6 @@ impl LocalBackend { branch_id: BranchIdTag::get_interner().get_or_intern("local"), cold_start_info: ColdStartInfo::WarmCached, }, - allow_self_signed_compute: false, }, } } diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index f38ecf715f..50cb94bfa0 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -463,6 +463,8 @@ impl ComputeConnectBackend for Backend<'_, ComputeCredentials> { #[cfg(test)] mod tests { + #![allow(clippy::unimplemented, clippy::unwrap_used)] + use std::net::IpAddr; use std::sync::Arc; use std::time::Duration; diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index f6bce9f2d8..eff49a402a 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -250,6 +250,7 @@ fn project_name_valid(name: &str) -> bool { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use serde_json::json; use ComputeUserInfoParseError::*; diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 623a0fd3b2..9538384b9e 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -229,7 +229,7 @@ async fn ssl_handshake( let (raw, read_buf) = stream.into_inner(); // TODO: Normally, client doesn't send any data before - // server says TLS handshake is ok and read_buf is empy. + // server says TLS handshake is ok and read_buf is empty. // However, you could imagine pipelining of postgres // SSLRequest + TLS ClientHello in one hunk similar to // pipelining in our node js driver. We should probably diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 97c4037009..e90555e250 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -105,6 +105,9 @@ struct ProxyCliArgs { /// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir #[clap(short = 'c', long, alias = "ssl-cert")] tls_cert: Option, + /// Allow writing TLS session keys to the given file pointed to by the environment variable `SSLKEYLOGFILE`. + #[clap(long, alias = "allow-ssl-keylogfile")] + allow_tls_keylogfile: bool, /// path to directory with TLS certificates for client postgres connections #[clap(long)] certs_dir: Option, @@ -555,6 +558,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { key_path, cert_path, args.certs_dir.as_ref(), + args.allow_tls_keylogfile, )?), (None, None) => None, _ => bail!("either both or neither tls-key and tls-cert must be specified"), diff --git a/proxy/src/cache/endpoints.rs b/proxy/src/cache/endpoints.rs index 20db1fbb14..0136446d6d 100644 --- a/proxy/src/cache/endpoints.rs +++ b/proxy/src/cache/endpoints.rs @@ -12,6 +12,7 @@ use tracing::info; use crate::config::EndpointCacheConfig; use crate::context::RequestContext; +use crate::ext::LockExt; use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt}; use crate::metrics::{Metrics, RedisErrors, RedisEventsCount}; use crate::rate_limiter::GlobalRateLimiter; @@ -96,7 +97,7 @@ impl EndpointsCache { // If the limiter allows, we can pretend like it's valid // (incase it is, due to redis channel lag). - if self.limiter.lock().unwrap().check() { + if self.limiter.lock_propagate_poison().check() { return true; } @@ -258,6 +259,7 @@ impl EndpointsCache { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 84430dc812..cab0b8b905 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -365,6 +365,7 @@ impl Cache for ProjectInfoCacheImpl { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; use crate::scram::ServerSecret; diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index ed717507ee..a58e3961da 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -3,8 +3,10 @@ use std::sync::Arc; use dashmap::DashMap; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use postgres_client::{CancelToken, NoTls}; +use once_cell::sync::OnceCell; +use postgres_client::{tls::MakeTlsConnect, CancelToken}; use pq_proto::CancelKeyData; +use rustls::crypto::ring; use thiserror::Error; use tokio::net::TcpStream; use tokio::sync::Mutex; @@ -13,12 +15,16 @@ use uuid::Uuid; use crate::auth::{check_peer_addr_is_in_list, IpPattern}; use crate::error::ReportableError; +use crate::ext::LockExt; use crate::metrics::{CancellationRequest, CancellationSource, Metrics}; use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::cancellation_publisher::{ CancellationPublisher, CancellationPublisherMut, RedisPublisherClient, }; +use crate::compute::{load_certs, AcceptEverythingVerifier}; +use crate::postgres_rustls::MakeRustlsConnect; + pub type CancelMap = Arc>>; pub type CancellationHandlerMain = CancellationHandler>>>; pub(crate) type CancellationHandlerMainInternal = Option>>; @@ -114,7 +120,7 @@ impl CancellationHandler

{ IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()), }; - if !self.limiter.lock().unwrap().check(subnet_key, 1) { + if !self.limiter.lock_propagate_poison().check(subnet_key, 1) { // log only the subnet part of the IP address to know which subnet is rate limited tracing::warn!("Rate limit exceeded. Skipping cancellation message, {subnet_key}"); Metrics::get() @@ -173,7 +179,10 @@ impl CancellationHandler

{ source: self.from, kind: crate::metrics::CancellationOutcome::Found, }); - info!("cancelling query per user's request using key {key}"); + info!( + "cancelling query per user's request using key {key}, hostname {}, address: {}", + cancel_closure.hostname, cancel_closure.socket_addr + ); cancel_closure.try_cancel_query().await } @@ -220,6 +229,8 @@ impl CancellationHandler>>> { } } +static TLS_ROOTS: OnceCell> = OnceCell::new(); + /// This should've been a [`std::future::Future`], but /// it's impossible to name a type of an unboxed future /// (we'd need something like `#![feature(type_alias_impl_trait)]`). @@ -228,6 +239,8 @@ pub struct CancelClosure { socket_addr: SocketAddr, cancel_token: CancelToken, ip_allowlist: Vec, + hostname: String, // for pg_sni router + allow_self_signed_compute: bool, } impl CancelClosure { @@ -235,17 +248,60 @@ impl CancelClosure { socket_addr: SocketAddr, cancel_token: CancelToken, ip_allowlist: Vec, + hostname: String, + allow_self_signed_compute: bool, ) -> Self { Self { socket_addr, cancel_token, ip_allowlist, + hostname, + allow_self_signed_compute, } } /// Cancels the query running on user's compute node. pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> { let socket = TcpStream::connect(self.socket_addr).await?; - self.cancel_token.cancel_query_raw(socket, NoTls).await?; + + let client_config = if self.allow_self_signed_compute { + // Allow all certificates for creating the connection. Used only for tests + let verifier = Arc::new(AcceptEverythingVerifier); + rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider())) + .with_safe_default_protocol_versions() + .expect("ring should support the default protocol versions") + .dangerous() + .with_custom_certificate_verifier(verifier) + } else { + let root_store = TLS_ROOTS + .get_or_try_init(load_certs) + .map_err(|_e| { + CancelError::IO(std::io::Error::new( + std::io::ErrorKind::Other, + "TLS root store initialization failed".to_string(), + )) + })? + .clone(); + rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider())) + .with_safe_default_protocol_versions() + .expect("ring should support the default protocol versions") + .with_root_certificates(root_store) + }; + + let client_config = client_config.with_no_client_auth(); + + let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config); + let tls = >::make_tls_connect( + &mut mk_tls, + &self.hostname, + ) + .map_err(|e| { + CancelError::IO(std::io::Error::new( + std::io::ErrorKind::Other, + e.to_string(), + )) + })?; + + self.cancel_token.cancel_query_raw(socket, tls).await?; debug!("query was cancelled"); Ok(()) } @@ -283,6 +339,7 @@ impl

Drop for Session

{ } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 4113b5bb80..42df5ff5e3 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -319,6 +319,8 @@ impl ConnCfg { secret_key, }, vec![], + host.to_string(), + allow_self_signed_compute, ); let connection = PostgresConnection { @@ -350,7 +352,7 @@ fn filtered_options(options: &str) -> Option { Some(options) } -fn load_certs() -> Result, Vec> { +pub(crate) fn load_certs() -> Result, Vec> { let der_certs = rustls_native_certs::load_native_certs(); if !der_certs.errors.is_empty() { @@ -364,7 +366,7 @@ fn load_certs() -> Result, Vec> = OnceCell::new(); #[derive(Debug)] -struct AcceptEverythingVerifier; +pub(crate) struct AcceptEverythingVerifier; impl ServerCertVerifier for AcceptEverythingVerifier { fn supported_verify_schemes(&self) -> Vec { use rustls::SignatureScheme; diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 8bc8e3f96f..debd77ac32 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -95,6 +95,7 @@ pub fn configure_tls( key_path: &str, cert_path: &str, certs_dir: Option<&String>, + allow_tls_keylogfile: bool, ) -> anyhow::Result { let mut cert_resolver = CertResolver::new(); @@ -135,6 +136,11 @@ pub fn configure_tls( config.alpn_protocols = vec![PG_ALPN_PROTOCOL.to_vec()]; + if allow_tls_keylogfile { + // KeyLogFile will check for the SSLKEYLOGFILE environment variable. + config.key_log = Arc::new(rustls::KeyLogFile::new()); + } + Ok(TlsConfig { config: Arc::new(config), common_names, @@ -221,15 +227,10 @@ impl CertResolver { ) -> anyhow::Result<()> { let priv_key = { let key_bytes = std::fs::read(key_path) - .context(format!("Failed to read TLS keys at '{key_path}'"))?; - let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec(); - - ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len()); - PrivateKeyDer::Pkcs8( - keys.pop() - .unwrap() - .context(format!("Failed to parse TLS keys at '{key_path}'"))?, - ) + .with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?; + rustls_pemfile::private_key(&mut &key_bytes[..]) + .with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))? + .with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))? }; let cert_chain_bytes = std::fs::read(cert_path) diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 65702e0e4c..02398fb777 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -213,9 +213,9 @@ pub(crate) async fn handle_client( params_compat: true, params: ¶ms, locks: &config.connect_compute_locks, + allow_self_signed_compute: config.allow_self_signed_compute, }, &user_info, - config.allow_self_signed_compute, config.wake_compute_retry_config, config.connect_to_compute_retry_config, ) diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 3105d08526..5f65b17374 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -23,6 +23,7 @@ use utils::backoff; use super::{RequestContextInner, LOG_CHAN}; use crate::config::remote_storage_from_toml; use crate::context::LOG_CHAN_DISCONNECT; +use crate::ext::TaskExt; #[derive(clap::Args, Clone, Debug)] pub struct ParquetUploadArgs { @@ -171,7 +172,9 @@ pub async fn worker( }; let (tx, mut rx) = mpsc::unbounded_channel(); - LOG_CHAN.set(tx.downgrade()).unwrap(); + LOG_CHAN + .set(tx.downgrade()) + .expect("only one worker should set the channel"); // setup row stream that will close on cancellation let cancellation_token2 = cancellation_token.clone(); @@ -207,7 +210,9 @@ pub async fn worker( config.parquet_upload_disconnect_events_remote_storage { let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel(); - LOG_CHAN_DISCONNECT.set(tx_disconnect.downgrade()).unwrap(); + LOG_CHAN_DISCONNECT + .set(tx_disconnect.downgrade()) + .expect("only one worker should set the channel"); // setup row stream that will close on cancellation tokio::spawn(async move { @@ -326,7 +331,7 @@ where Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta)) }) .await - .unwrap()?; + .propagate_task_panic()?; rows.clear(); Ok((rows, w, rg_meta)) @@ -352,7 +357,7 @@ async fn upload_parquet( Ok((buffer, metadata)) }) .await - .unwrap()?; + .propagate_task_panic()?; let data = buffer.split().freeze(); @@ -409,6 +414,7 @@ async fn upload_parquet( } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use std::net::Ipv4Addr; use std::num::NonZeroUsize; diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index e33a37f643..00038a6ac6 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -250,7 +250,6 @@ impl NeonControlPlaneClient { let node = NodeInfo { config, aux: body.aux, - allow_self_signed_compute: false, }; Ok(node) diff --git a/proxy/src/control_plane/client/mock.rs b/proxy/src/control_plane/client/mock.rs index eaf692ab27..5f8bda0f35 100644 --- a/proxy/src/control_plane/client/mock.rs +++ b/proxy/src/control_plane/client/mock.rs @@ -102,7 +102,9 @@ impl MockControlPlane { Some(s) => { info!("got allowed_ips: {s}"); s.split(',') - .map(|s| IpPattern::from_str(s).unwrap()) + .map(|s| { + IpPattern::from_str(s).expect("mocked ip pattern should be correct") + }) .collect() } None => vec![], @@ -174,7 +176,6 @@ impl MockControlPlane { branch_id: (&BranchId::from("branch")).into(), cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm, }, - allow_self_signed_compute: false, }; Ok(node) diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index 41972e4e44..c0718920b4 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -67,28 +67,21 @@ pub(crate) struct NodeInfo { /// Labels for proxy's metrics. pub(crate) aux: MetricsAuxInfo, - - /// Whether we should accept self-signed certificates (for testing) - pub(crate) allow_self_signed_compute: bool, } impl NodeInfo { pub(crate) async fn connect( &self, ctx: &RequestContext, + allow_self_signed_compute: bool, timeout: Duration, ) -> Result { self.config - .connect( - ctx, - self.allow_self_signed_compute, - self.aux.clone(), - timeout, - ) + .connect(ctx, allow_self_signed_compute, self.aux.clone(), timeout) .await } + pub(crate) fn reuse_settings(&mut self, other: Self) { - self.allow_self_signed_compute = other.allow_self_signed_compute; self.config.reuse_password(other.config); } diff --git a/proxy/src/ext.rs b/proxy/src/ext.rs new file mode 100644 index 0000000000..8d00afbf51 --- /dev/null +++ b/proxy/src/ext.rs @@ -0,0 +1,41 @@ +use std::panic::resume_unwind; +use std::sync::{Mutex, MutexGuard}; + +use tokio::task::JoinError; + +pub(crate) trait LockExt { + fn lock_propagate_poison(&self) -> MutexGuard<'_, T>; +} + +impl LockExt for Mutex { + /// Lock the mutex and panic if the mutex was poisoned. + #[track_caller] + fn lock_propagate_poison(&self) -> MutexGuard<'_, T> { + match self.lock() { + Ok(guard) => guard, + // poison occurs when another thread panicked while holding the lock guard. + // since panicking is often unrecoverable, propagating the poison panic is reasonable. + Err(poison) => panic!("{poison}"), + } + } +} + +pub(crate) trait TaskExt { + fn propagate_task_panic(self) -> T; +} + +impl TaskExt for Result { + /// Unwrap the result and panic if the inner task panicked. + /// Also panics if the task was cancelled + #[track_caller] + fn propagate_task_panic(self) -> T { + match self { + Ok(t) => t, + // Using resume_unwind prevents the panic hook being called twice. + // Since we use this for structured concurrency, there is only + // 1 logical panic, so this is more correct. + Err(e) if e.is_panic() => resume_unwind(e.into_panic()), + Err(e) => panic!("unexpected task error: {e}"), + } + } +} diff --git a/proxy/src/http/health_server.rs b/proxy/src/http/health_server.rs index 978ad9f761..6ca091feb7 100644 --- a/proxy/src/http/health_server.rs +++ b/proxy/src/http/health_server.rs @@ -14,6 +14,7 @@ use utils::http::error::ApiError; use utils::http::json::json_response; use utils::http::{RouterBuilder, RouterService}; +use crate::ext::{LockExt, TaskExt}; use crate::jemalloc; async fn status_handler(_: Request) -> Result, ApiError> { @@ -76,7 +77,7 @@ async fn prometheus_metrics_handler( let body = tokio::task::spawn_blocking(move || { let _span = span.entered(); - let mut state = state.lock().unwrap(); + let mut state = state.lock_propagate_poison(); let PrometheusHandler { encoder, metrics } = &mut *state; metrics @@ -94,13 +95,13 @@ async fn prometheus_metrics_handler( body }) .await - .unwrap(); + .propagate_task_panic(); let response = Response::builder() .status(200) .header(CONTENT_TYPE, "text/plain; version=0.0.4") .body(Body::from(body)) - .unwrap(); + .expect("response headers should be valid"); Ok(response) } diff --git a/proxy/src/intern.rs b/proxy/src/intern.rs index f56d92a6b3..79c6020302 100644 --- a/proxy/src/intern.rs +++ b/proxy/src/intern.rs @@ -83,7 +83,7 @@ impl StringInterner { pub(crate) fn new() -> Self { StringInterner { inner: ThreadedRodeo::with_capacity_memory_limits_and_hasher( - Capacity::new(2500, NonZeroUsize::new(1 << 16).unwrap()), + Capacity::new(2500, NonZeroUsize::new(1 << 16).expect("value is nonzero")), // unbounded MemoryLimits::for_memory_usage(usize::MAX), BuildHasherDefault::::default(), @@ -207,6 +207,7 @@ impl From for ProjectIdInt { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use std::sync::OnceLock; diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index ba69f9cf2d..a5a72f26d9 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -22,8 +22,8 @@ clippy::string_add, clippy::string_to_string, clippy::todo, - // TODO: consider clippy::unimplemented - // TODO: consider clippy::unwrap_used + clippy::unimplemented, + clippy::unwrap_used, )] // List of permanently allowed lints. #![allow( @@ -82,6 +82,7 @@ pub mod console_redirect_proxy; pub mod context; pub mod control_plane; pub mod error; +mod ext; pub mod http; pub mod intern; pub mod jemalloc; diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index 74d2b9a1d0..41f10f052f 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -18,8 +18,16 @@ pub async fn init() -> anyhow::Result { let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy() - .add_directive("aws_config=info".parse().unwrap()) - .add_directive("azure_core::policies::transport=off".parse().unwrap()); + .add_directive( + "aws_config=info" + .parse() + .expect("this should be a valid filter directive"), + ) + .add_directive( + "azure_core::policies::transport=off" + .parse() + .expect("this should be a valid filter directive"), + ); let fmt_layer = tracing_subscriber::fmt::layer() .with_ansi(false) diff --git a/proxy/src/parse.rs b/proxy/src/parse.rs index 8c0f251066..095d6278cc 100644 --- a/proxy/src/parse.rs +++ b/proxy/src/parse.rs @@ -8,14 +8,6 @@ pub(crate) fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> { Some((cstr, other)) } -/// See . -pub(crate) fn split_at_const(bytes: &[u8]) -> Option<(&[u8; N], &[u8])> { - (bytes.len() >= N).then(|| { - let (head, tail) = bytes.split_at(N); - (head.try_into().unwrap(), tail) - }) -} - #[cfg(test)] mod tests { use super::*; @@ -33,11 +25,4 @@ mod tests { assert_eq!(cstr.to_bytes(), b"foo"); assert_eq!(rest, b"bar"); } - - #[test] - fn test_split_at_const() { - assert!(split_at_const::<0>(b"").is_some()); - assert!(split_at_const::<1>(b"").is_none()); - assert!(matches!(split_at_const::<1>(b"ok"), Some((b"o", b"k")))); - } } diff --git a/proxy/src/protocol2.rs b/proxy/src/protocol2.rs index 33a5eb5e1e..0dc97b7097 100644 --- a/proxy/src/protocol2.rs +++ b/proxy/src/protocol2.rs @@ -396,6 +396,7 @@ impl NetworkEndianIpv6 { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use tokio::io::AsyncReadExt; diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index a3027abd7c..6da4c90a53 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -73,6 +73,9 @@ pub(crate) struct TcpMechanism<'a> { /// connect_to_compute concurrency lock pub(crate) locks: &'static ApiLocks, + + /// Whether we should accept self-signed certificates (for testing) + pub(crate) allow_self_signed_compute: bool, } #[async_trait] @@ -90,7 +93,11 @@ impl ConnectMechanism for TcpMechanism<'_> { ) -> Result { let host = node_info.config.get_host(); let permit = self.locks.get_permit(&host).await?; - permit.release_result(node_info.connect(ctx, timeout).await) + permit.release_result( + node_info + .connect(ctx, self.allow_self_signed_compute, timeout) + .await, + ) } fn update_connect_config(&self, config: &mut compute::ConnCfg) { @@ -104,7 +111,6 @@ pub(crate) async fn connect_to_compute Result @@ -117,7 +123,6 @@ where wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; node_info.set_keys(user_info.get_keys()); - node_info.allow_self_signed_compute = allow_self_signed_compute; mechanism.update_connect_config(&mut node_info.config); // try once diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 4e4af88634..3336a9556a 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -257,6 +257,7 @@ impl CopyBuffer { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use tokio::io::AsyncWriteExt; diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index cc04bc5e5c..4e5ecda237 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -191,13 +191,6 @@ impl ClientMode { } } - pub(crate) fn allow_self_signed_compute(&self, config: &ProxyConfig) -> bool { - match self { - ClientMode::Tcp => config.allow_self_signed_compute, - ClientMode::Websockets { .. } => false, - } - } - fn hostname<'a, S>(&'a self, s: &'a Stream) -> Option<&'a str> { match self { ClientMode::Tcp => s.sni_hostname(), @@ -355,9 +348,10 @@ pub(crate) async fn handle_client( params_compat, params: ¶ms, locks: &config.connect_compute_locks, + // only used for console redirect testing. + allow_self_signed_compute: false, }, &user_info, - mode.allow_self_signed_compute(config), config.wake_compute_retry_config, config.connect_to_compute_retry_config, ) @@ -494,7 +488,7 @@ impl NeonOptions { pub(crate) fn neon_option(bytes: &str) -> Option<(&str, &str)> { static RE: OnceCell = OnceCell::new(); - let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").unwrap()); + let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").expect("regex should be correct")); let cap = re.captures(bytes)?; let (_, [k, v]) = cap.extract(); diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index 911b349416..95c518fed9 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -1,4 +1,5 @@ //! A group of high-level tests for connection establishing logic and auth. +#![allow(clippy::unimplemented, clippy::unwrap_used)] mod mitm; @@ -553,7 +554,6 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn branch_id: (&BranchId::from("branch")).into(), cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm, }, - allow_self_signed_compute: false, }; let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone())); node2.map(|()| node) @@ -588,7 +588,7 @@ async fn connect_to_compute_success() { max_retries: 5, backoff_factor: 2.0, }; - connect_to_compute(&ctx, &mechanism, &user_info, false, config, config) + connect_to_compute(&ctx, &mechanism, &user_info, config, config) .await .unwrap(); mechanism.verify(); @@ -606,7 +606,7 @@ async fn connect_to_compute_retry() { max_retries: 5, backoff_factor: 2.0, }; - connect_to_compute(&ctx, &mechanism, &user_info, false, config, config) + connect_to_compute(&ctx, &mechanism, &user_info, config, config) .await .unwrap(); mechanism.verify(); @@ -625,7 +625,7 @@ async fn connect_to_compute_non_retry_1() { max_retries: 5, backoff_factor: 2.0, }; - connect_to_compute(&ctx, &mechanism, &user_info, false, config, config) + connect_to_compute(&ctx, &mechanism, &user_info, config, config) .await .unwrap_err(); mechanism.verify(); @@ -644,7 +644,7 @@ async fn connect_to_compute_non_retry_2() { max_retries: 5, backoff_factor: 2.0, }; - connect_to_compute(&ctx, &mechanism, &user_info, false, config, config) + connect_to_compute(&ctx, &mechanism, &user_info, config, config) .await .unwrap(); mechanism.verify(); @@ -674,7 +674,6 @@ async fn connect_to_compute_non_retry_3() { &ctx, &mechanism, &user_info, - false, wake_compute_retry_config, connect_to_compute_retry_config, ) @@ -696,7 +695,7 @@ async fn wake_retry() { max_retries: 5, backoff_factor: 2.0, }; - connect_to_compute(&ctx, &mechanism, &user_info, false, config, config) + connect_to_compute(&ctx, &mechanism, &user_info, config, config) .await .unwrap(); mechanism.verify(); @@ -715,7 +714,7 @@ async fn wake_non_retry() { max_retries: 5, backoff_factor: 2.0, }; - connect_to_compute(&ctx, &mechanism, &user_info, false, config, config) + connect_to_compute(&ctx, &mechanism, &user_info, config, config) .await .unwrap_err(); mechanism.verify(); diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index 45f9630dde..bff800f0a2 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -83,7 +83,7 @@ impl From for utils::leaky_bucket::LeakyBucketConfig { } #[cfg(test)] -#[allow(clippy::float_cmp)] +#[allow(clippy::float_cmp, clippy::unwrap_used)] mod tests { use std::time::Duration; diff --git a/proxy/src/rate_limiter/limit_algorithm/aimd.rs b/proxy/src/rate_limiter/limit_algorithm/aimd.rs index 3000cc4c2a..04e136b6d5 100644 --- a/proxy/src/rate_limiter/limit_algorithm/aimd.rs +++ b/proxy/src/rate_limiter/limit_algorithm/aimd.rs @@ -63,6 +63,7 @@ impl LimitAlgorithm for Aimd { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use std::time::Duration; diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index a048721e77..6f6a8c9d47 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -12,6 +12,7 @@ use rand::{Rng, SeedableRng}; use tokio::time::{Duration, Instant}; use tracing::info; +use crate::ext::LockExt; use crate::intern::EndpointIdInt; pub struct GlobalRateLimiter { @@ -246,12 +247,13 @@ impl BucketRateLimiter { let n = self.map.shards().len(); // this lock is ok as the periodic cycle of do_gc makes this very unlikely to collide // (impossible, infact, unless we have 2048 threads) - let shard = self.rand.lock().unwrap().gen_range(0..n); + let shard = self.rand.lock_propagate_poison().gen_range(0..n); self.map.shards()[shard].write().clear(); } } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use std::hash::BuildHasherDefault; use std::time::Duration; diff --git a/proxy/src/redis/connection_with_credentials_provider.rs b/proxy/src/redis/connection_with_credentials_provider.rs index 82139ea1d5..0f6e765b02 100644 --- a/proxy/src/redis/connection_with_credentials_provider.rs +++ b/proxy/src/redis/connection_with_credentials_provider.rs @@ -69,7 +69,11 @@ impl ConnectionWithCredentialsProvider { pub fn new_with_static_credentials(params: T) -> Self { Self { - credentials: Credentials::Static(params.into_connection_info().unwrap()), + credentials: Credentials::Static( + params + .into_connection_info() + .expect("static configured redis credentials should be a valid format"), + ), con: None, refresh_token_task: None, mutex: tokio::sync::Mutex::new(()), diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index f3aa97c032..d18dfd2465 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -6,6 +6,7 @@ use pq_proto::CancelKeyData; use redis::aio::PubSub; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use uuid::Uuid; use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider; @@ -13,7 +14,6 @@ use crate::cache::project_info::ProjectInfoCache; use crate::cancellation::{CancelMap, CancellationHandler}; use crate::intern::{ProjectIdInt, RoleNameInt}; use crate::metrics::{Metrics, RedisErrors, RedisEventsCount}; -use tracing::Instrument; const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates"; pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates"; diff --git a/proxy/src/sasl/messages.rs b/proxy/src/sasl/messages.rs index 1373dfba3d..4922ece615 100644 --- a/proxy/src/sasl/messages.rs +++ b/proxy/src/sasl/messages.rs @@ -2,7 +2,7 @@ use pq_proto::{BeAuthenticationSaslMessage, BeMessage}; -use crate::parse::{split_at_const, split_cstr}; +use crate::parse::split_cstr; /// SASL-specific payload of [`PasswordMessage`](pq_proto::FeMessage::PasswordMessage). #[derive(Debug)] @@ -19,7 +19,7 @@ impl<'a> FirstMessage<'a> { let (method_cstr, tail) = split_cstr(bytes)?; let method = method_cstr.to_str().ok()?; - let (len_bytes, bytes) = split_at_const(tail)?; + let (len_bytes, bytes) = tail.split_first_chunk()?; let len = u32::from_be_bytes(*len_bytes) as usize; if len != bytes.len() { return None; @@ -51,6 +51,7 @@ impl<'a> ServerMessage<&'a str> { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; diff --git a/proxy/src/scram/messages.rs b/proxy/src/scram/messages.rs index 5ee3a51352..0e54e7ded9 100644 --- a/proxy/src/scram/messages.rs +++ b/proxy/src/scram/messages.rs @@ -185,6 +185,7 @@ impl fmt::Debug for OwnedServerFirstMessage { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; diff --git a/proxy/src/scram/mod.rs b/proxy/src/scram/mod.rs index 718445f61d..b49a9f32ee 100644 --- a/proxy/src/scram/mod.rs +++ b/proxy/src/scram/mod.rs @@ -57,6 +57,7 @@ fn sha256<'a>(parts: impl IntoIterator) -> [u8; 32] { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::threadpool::ThreadPool; use super::{Exchange, ServerSecret}; diff --git a/proxy/src/scram/secret.rs b/proxy/src/scram/secret.rs index 8c6a08d432..eb21b26ab4 100644 --- a/proxy/src/scram/secret.rs +++ b/proxy/src/scram/secret.rs @@ -72,6 +72,7 @@ impl ServerSecret { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; diff --git a/proxy/src/scram/threadpool.rs b/proxy/src/scram/threadpool.rs index ebc6dd2a3c..8f1684c75b 100644 --- a/proxy/src/scram/threadpool.rs +++ b/proxy/src/scram/threadpool.rs @@ -33,14 +33,11 @@ thread_local! { } impl ThreadPool { - pub fn new(n_workers: u8) -> Arc { + pub fn new(mut n_workers: u8) -> Arc { // rayon would be nice here, but yielding in rayon does not work well afaict. if n_workers == 0 { - return Arc::new(Self { - runtime: None, - metrics: Arc::new(ThreadPoolMetrics::new(n_workers as usize)), - }); + n_workers = 1; } Arc::new_cyclic(|pool| { @@ -66,7 +63,7 @@ impl ThreadPool { }); }) .build() - .unwrap(); + .expect("password threadpool runtime should be configured correctly"); Self { runtime: Some(runtime), @@ -79,7 +76,7 @@ impl ThreadPool { JobHandle( self.runtime .as_ref() - .unwrap() + .expect("runtime is always set") .spawn(JobSpec { pbkdf2, endpoint }), ) } @@ -87,7 +84,10 @@ impl ThreadPool { impl Drop for ThreadPool { fn drop(&mut self) { - self.runtime.take().unwrap().shutdown_background(); + self.runtime + .take() + .expect("runtime is always set") + .shutdown_background(); } } diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 251aa47084..449d50b6e7 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -195,7 +195,6 @@ impl PoolingBackend { locks: &self.config.connect_compute_locks, }, &backend, - false, // do not allow self signed compute for http flow self.config.wake_compute_retry_config, self.config.connect_to_compute_retry_config, ) @@ -237,7 +236,6 @@ impl PoolingBackend { locks: &self.config.connect_compute_locks, }, &backend, - false, // do not allow self signed compute for http flow self.config.wake_compute_retry_config, self.config.connect_to_compute_retry_config, ) @@ -270,7 +268,11 @@ impl PoolingBackend { if !self.local_pool.initialized(&conn_info) { // only install and grant usage one at a time. - let _permit = local_backend.initialize.acquire().await.unwrap(); + let _permit = local_backend + .initialize + .acquire() + .await + .expect("semaphore should never be closed"); // check again for race if !self.local_pool.initialized(&conn_info) { diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index cac5a173cb..447103edce 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -186,8 +186,8 @@ impl ClientDataRemote { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { - use std::mem; use std::sync::atomic::AtomicBool; use super::*; @@ -269,39 +269,33 @@ mod tests { assert_eq!(0, pool.get_global_connections_count()); } { - let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone()); - client.do_drop().unwrap()(); - mem::forget(client); // drop the client + let client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone()); + drop(client); assert_eq!(1, pool.get_global_connections_count()); } { - let mut closed_client = Client::new( + let closed_client = Client::new( create_inner_with(MockClient::new(true)), conn_info.clone(), ep_pool.clone(), ); - closed_client.do_drop().unwrap()(); - mem::forget(closed_client); // drop the client - // The closed client shouldn't be added to the pool. + drop(closed_client); assert_eq!(1, pool.get_global_connections_count()); } let is_closed: Arc = Arc::new(false.into()); { - let mut client = Client::new( + let client = Client::new( create_inner_with(MockClient(is_closed.clone())), conn_info.clone(), ep_pool.clone(), ); - client.do_drop().unwrap()(); - mem::forget(client); // drop the client - + drop(client); // The client should be added to the pool. assert_eq!(2, pool.get_global_connections_count()); } { - let mut client = Client::new(create_inner(), conn_info, ep_pool); - client.do_drop().unwrap()(); - mem::forget(client); // drop the client + let client = Client::new(create_inner(), conn_info, ep_pool); + drop(client); // The client shouldn't be added to the pool. Because the ep-pool is full. assert_eq!(2, pool.get_global_connections_count()); @@ -319,15 +313,13 @@ mod tests { &pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()), ); { - let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone()); - client.do_drop().unwrap()(); - mem::forget(client); // drop the client + let client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone()); + drop(client); assert_eq!(3, pool.get_global_connections_count()); } { - let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone()); - client.do_drop().unwrap()(); - mem::forget(client); // drop the client + let client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone()); + drop(client); // The client shouldn't be added to the pool. Because the global pool is full. assert_eq!(3, pool.get_global_connections_count()); diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 2a46c8f9c5..44eac77e8f 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -187,19 +187,22 @@ impl EndpointConnPool { pub(crate) fn put(pool: &RwLock, conn_info: &ConnInfo, client: ClientInnerCommon) { let conn_id = client.get_conn_id(); - let pool_name = pool.read().get_name().to_string(); + let (max_conn, conn_count, pool_name) = { + let pool = pool.read(); + ( + pool.global_pool_size_max_conns, + pool.global_connections_count + .load(atomic::Ordering::Relaxed), + pool.get_name().to_string(), + ) + }; + if client.inner.is_closed() { info!(%conn_id, "{}: throwing away connection '{conn_info}' because connection is closed", pool_name); return; } - let global_max_conn = pool.read().global_pool_size_max_conns; - if pool - .read() - .global_connections_count - .load(atomic::Ordering::Relaxed) - >= global_max_conn - { + if conn_count >= max_conn { info!(%conn_id, "{}: throwing away connection '{conn_info}' because pool is full", pool_name); return; } @@ -633,35 +636,29 @@ impl Client { } pub(crate) fn metrics(&self) -> Arc { - let aux = &self.inner.as_ref().unwrap().aux; + let aux = &self + .inner + .as_ref() + .expect("client inner should not be removed") + .aux; USAGE_METRICS.register(Ids { endpoint_id: aux.endpoint_id, branch_id: aux.branch_id, }) } +} - pub(crate) fn do_drop(&mut self) -> Option> { +impl Drop for Client { + fn drop(&mut self) { let conn_info = self.conn_info.clone(); let client = self .inner .take() .expect("client inner should not be removed"); if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() { - let current_span = self.span.clone(); + let _current_span = self.span.enter(); // return connection to the pool - return Some(move || { - let _span = current_span.enter(); - EndpointConnPool::put(&conn_pool, &conn_info, client); - }); - } - None - } -} - -impl Drop for Client { - fn drop(&mut self) { - if let Some(drop) = self.do_drop() { - tokio::task::spawn_blocking(drop); + EndpointConnPool::put(&conn_pool, &conn_info, client); } } } diff --git a/proxy/src/serverless/http_util.rs b/proxy/src/serverless/http_util.rs index c0208d4f68..d5c948777c 100644 --- a/proxy/src/serverless/http_util.rs +++ b/proxy/src/serverless/http_util.rs @@ -81,11 +81,14 @@ impl HttpErrorBody { .header(http::header::CONTENT_TYPE, "application/json") // we do not have nested maps with non string keys so serialization shouldn't fail .body( - Full::new(Bytes::from(serde_json::to_string(self).unwrap())) - .map_err(|x| match x {}) - .boxed(), + Full::new(Bytes::from( + serde_json::to_string(self) + .expect("serialising HttpErrorBody should never fail"), + )) + .map_err(|x| match x {}) + .boxed(), ) - .unwrap() + .expect("content-type header should be valid") } } diff --git a/proxy/src/serverless/json.rs b/proxy/src/serverless/json.rs index 25b25c66d3..ab012bd020 100644 --- a/proxy/src/serverless/json.rs +++ b/proxy/src/serverless/json.rs @@ -204,7 +204,10 @@ fn pg_array_parse_inner( if c == '\\' { escaped = true; - (i, c) = pg_array_chr.next().unwrap(); + let Some(x) = pg_array_chr.next() else { + return Err(JsonConversionError::UnbalancedArray); + }; + (i, c) = x; } match c { @@ -253,6 +256,7 @@ fn pg_array_parse_inner( } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use serde_json::json; diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index b84cde9e25..c51a2bc9ba 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -179,7 +179,6 @@ pub(crate) fn poll_client( info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); }); let pool = Arc::downgrade(&global_pool); - let pool_clone = pool.clone(); let db_user = conn_info.db_and_user(); let idle = global_pool.get_idle_timeout(); @@ -273,11 +272,7 @@ pub(crate) fn poll_client( }), }; - Client::new( - inner, - conn_info, - Arc::downgrade(&pool_clone.upgrade().unwrap().global_pool), - ) + Client::new(inner, conn_info, Arc::downgrade(&global_pool.global_pool)) } impl ClientInnerCommon { @@ -321,7 +316,8 @@ fn resign_jwt(sk: &SigningKey, payload: &[u8], jti: u64) -> Result(buffer.format(jti)).unwrap(); + let jti = serde_json::from_str::<&RawValue>(buffer.format(jti)) + .expect("itoa formatted integer should be guaranteed valid json"); // update the jti in-place let payload = @@ -368,6 +364,7 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use p256::ecdsa::SigningKey; use typed_json::json; diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 80b42f9e55..c2623e0eca 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -46,6 +46,7 @@ use utils::http::error::ApiError; use crate::cancellation::CancellationHandlerMain; use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; +use crate::ext::TaskExt; use crate::metrics::Metrics; use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectHeader, ConnectionInfo}; use crate::proxy::run_until_cancelled; @@ -84,7 +85,7 @@ pub async fn task_main( cancellation_token.cancelled().await; tokio::task::spawn_blocking(move || conn_pool.shutdown()) .await - .unwrap(); + .propagate_task_panic(); } }); @@ -104,7 +105,7 @@ pub async fn task_main( cancellation_token.cancelled().await; tokio::task::spawn_blocking(move || http_conn_pool.shutdown()) .await - .unwrap(); + .propagate_task_panic(); } }); diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 5e85f5ec40..3e42787a09 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -1110,6 +1110,7 @@ impl Discard<'_> { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index bdb83fe6be..812fedaf04 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -178,6 +178,7 @@ pub(crate) async fn serve_websocket( } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use std::pin::pin; diff --git a/proxy/src/url.rs b/proxy/src/url.rs index 270cd7c24d..d73a84057a 100644 --- a/proxy/src/url.rs +++ b/proxy/src/url.rs @@ -50,6 +50,7 @@ impl std::fmt::Display for ApiUrl { } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 65e74466f2..487504d709 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -407,6 +407,7 @@ async fn upload_backup_events( } #[cfg(test)] +#[expect(clippy::unwrap_used)] mod tests { use std::fs; use std::io::BufReader; diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 0422c46ab1..086407603f 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -55,6 +55,7 @@ postgres_ffi.workspace = true pq_proto.workspace = true remote_storage.workspace = true safekeeper_api.workspace = true +safekeeper_client.workspace = true sha2.workspace = true sd-notify.workspace = true storage_broker.workspace = true diff --git a/safekeeper/client/Cargo.toml b/safekeeper/client/Cargo.toml new file mode 100644 index 0000000000..6c5a52de3a --- /dev/null +++ b/safekeeper/client/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "safekeeper_client" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +safekeeper_api.workspace = true +thiserror.workspace = true +reqwest = { workspace = true, features = [ "stream" ] } +serde.workspace = true +utils.workspace = true +workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/safekeeper/client/src/lib.rs b/safekeeper/client/src/lib.rs new file mode 100644 index 0000000000..3963fd466c --- /dev/null +++ b/safekeeper/client/src/lib.rs @@ -0,0 +1 @@ +pub mod mgmt_api; diff --git a/safekeeper/src/http/client.rs b/safekeeper/client/src/mgmt_api.rs similarity index 96% rename from safekeeper/src/http/client.rs rename to safekeeper/client/src/mgmt_api.rs index 669a9c0ce9..f78745043a 100644 --- a/safekeeper/src/http/client.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -2,10 +2,6 @@ //! //! Partially copied from pageserver client; some parts might be better to be //! united. -//! -//! It would be also good to move it out to separate crate, but this needs -//! duplication of internal-but-reported structs like WalSenderState, ServerInfo -//! etc. use reqwest::{IntoUrl, Method, StatusCode}; use safekeeper_api::models::TimelineStatus; diff --git a/safekeeper/src/http/mod.rs b/safekeeper/src/http/mod.rs index 7229ccb739..d82a713f8a 100644 --- a/safekeeper/src/http/mod.rs +++ b/safekeeper/src/http/mod.rs @@ -1,4 +1,3 @@ -pub mod client; pub mod routes; pub use routes::make_router; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 00777273cb..f2d8e4c85f 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -5,6 +5,8 @@ use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use safekeeper_api::{models::TimelineStatus, Term}; +use safekeeper_client::mgmt_api; +use safekeeper_client::mgmt_api::Client; use serde::{Deserialize, Serialize}; use std::{ cmp::min, @@ -22,7 +24,6 @@ use tracing::{error, info, instrument}; use crate::{ control_file::CONTROL_FILE_NAME, debug_dump, - http::client::{self, Client}, state::{EvictionState, TimelinePersistentState}, timeline::{Timeline, WalResidentTimeline}, timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline}, @@ -419,7 +420,7 @@ pub async fn handle_request( let http_hosts = request.http_hosts.clone(); // Figure out statuses of potential donors. - let responses: Vec> = + let responses: Vec> = futures::future::join_all(http_hosts.iter().map(|url| async { let cclient = Client::new(url.clone(), sk_auth_token.clone()); let info = cclient diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 08371177cd..3e9ce1da8e 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -318,7 +318,7 @@ struct NetworkReader<'a, IO> { global_timelines: Arc, } -impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { +impl NetworkReader<'_, IO> { async fn read_first_message( &mut self, ) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> { diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index ccd7940c72..6ceaf325b0 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -125,10 +125,7 @@ impl TermHistory { ); last_common_idx = Some(i); } - let last_common_idx = match last_common_idx { - None => return None, // no common point - Some(lci) => lci, - }; + let last_common_idx = last_common_idx?; // Now find where it ends at both prop and sk and take min. End of // (common) term is the start of the next except it is the last one; // there it is flush_lsn in case of safekeeper or, in case of proposer diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 746177c089..a89e4741f6 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -6873,10 +6873,7 @@ impl Service { let mut plan = Vec::new(); for (node_id, attached) in nodes_by_load { - let available = locked - .nodes - .get(&node_id) - .map_or(false, |n| n.is_available()); + let available = locked.nodes.get(&node_id).is_some_and(|n| n.is_available()); if !available { continue; } diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index addf702893..87579f9e92 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -426,6 +426,7 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): [ ".*removing local file.*because it has unexpected length.*", ".*__temp.*", + ".*method=POST path=\\S+/timeline .*: Not activating a Stopping timeline.*", # FIXME: there are still anyhow::Error paths in timeline creation/deletion which # generate 500 results when called during shutdown (https://github.com/neondatabase/neon/issues/9768) ".*InternalServerError.*", diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 23d4f23cdb..0a8900b351 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1090,6 +1090,62 @@ def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder): endpoint.safe_psql("SELECT 'works'") +# Test restarting compute at WAL page boundary. +def test_restart_endpoint_wal_page_boundary(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + + ep = env.endpoints.create_start("main") + ep.safe_psql("create table t (i int)") + + with ep.cursor() as cur: + # measure how much space logical message takes. Sometimes first attempt + # creates huge message and then it stabilizes, have no idea why. + for _ in range(3): + lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + log.info(f"current_lsn={lsn_before}") + # Non-transactional logical message doesn't write WAL, only XLogInsert's + # it, so use transactional. Which is a bit problematic as transactional + # necessitates commit record. Alternatively we can do smth like + # select neon_xlogflush(pg_current_wal_insert_lsn()); + # but isn't much better + that particular call complains on 'xlog flush + # request 0/282C018 is not satisfied' as pg_current_wal_insert_lsn skips + # page headers. + payload = "blahblah" + cur.execute(f"select pg_logical_emit_message(true, 'pref', '{payload}')") + lsn_after_by_curr_wal_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + lsn_diff = lsn_after_by_curr_wal_lsn - lsn_before + logical_message_base = lsn_after_by_curr_wal_lsn - lsn_before - len(payload) + log.info( + f"before {lsn_before}, after {lsn_after_by_curr_wal_lsn}, lsn diff is {lsn_diff}, base {logical_message_base}" + ) + + # and write logical message spanning exactly as we want + lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + log.info(f"current_lsn={lsn_before}") + curr_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + offs = int(curr_lsn) % 8192 + till_page = 8192 - offs + target_lsn = curr_lsn + till_page + payload_len = ( + till_page - logical_message_base - 8 + ) # not sure why 8 is here, it is deduced from experiments + log.info( + f"current_lsn={curr_lsn}, offs {offs}, till_page {till_page}, target_lsn {target_lsn}" + ) + + cur.execute(f"select pg_logical_emit_message(true, 'pref', 'f{'a' * payload_len}')") + supposedly_contrecord_end = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + log.info(f"supposedly_page_boundary={supposedly_contrecord_end}") + # The calculations to hit the page boundary are very fuzzy, so just + # ignore test if we fail to reach it. + if not (int(supposedly_contrecord_end) % 8192 == 0): + pytest.skip(f"missed page boundary, bad luck: lsn is {supposedly_contrecord_end}") + + ep.stop(mode="immediate") + ep = env.endpoints.create_start("main") + ep.safe_psql("insert into t values (42)") # should be ok + + # Context manager which logs passed time on exit. class DurationLogger: def __init__(self, desc):