mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Merge branch 'main' into jcsp/no-coverage
This commit is contained in:
121
Cargo.lock
generated
121
Cargo.lock
generated
@@ -10,9 +10,9 @@ checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5"
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.21.0"
|
||||
version = "0.24.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
|
||||
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
|
||||
dependencies = [
|
||||
"gimli",
|
||||
]
|
||||
@@ -23,6 +23,12 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||
|
||||
[[package]]
|
||||
name = "adler2"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.8.11"
|
||||
@@ -871,17 +877,17 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.69"
|
||||
version = "0.3.74"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
|
||||
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"miniz_oxide 0.8.0",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1127,7 +1133,7 @@ dependencies = [
|
||||
"num-traits",
|
||||
"serde",
|
||||
"wasm-bindgen",
|
||||
"windows-targets 0.52.4",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2107,7 +2113,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"miniz_oxide",
|
||||
"miniz_oxide 0.7.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2308,9 +2314,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.28.1"
|
||||
version = "0.31.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
|
||||
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
|
||||
|
||||
[[package]]
|
||||
name = "git-version"
|
||||
@@ -3404,6 +3410,15 @@ dependencies = [
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
|
||||
dependencies = [
|
||||
"adler2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.8.11"
|
||||
@@ -3638,9 +3653,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.32.2"
|
||||
version = "0.36.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441"
|
||||
checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
@@ -4401,11 +4416,13 @@ dependencies = [
|
||||
"bindgen",
|
||||
"bytes",
|
||||
"crc32c",
|
||||
"criterion",
|
||||
"env_logger",
|
||||
"log",
|
||||
"memoffset 0.9.0",
|
||||
"once_cell",
|
||||
"postgres",
|
||||
"pprof",
|
||||
"regex",
|
||||
"serde",
|
||||
"thiserror",
|
||||
@@ -5062,6 +5079,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -5320,9 +5338,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustc-demangle"
|
||||
version = "0.1.23"
|
||||
version = "0.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
|
||||
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
@@ -5535,6 +5553,7 @@ dependencies = [
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
"scopeguard",
|
||||
"sd-notify",
|
||||
"serde",
|
||||
@@ -5572,6 +5591,18 @@ dependencies = [
|
||||
"utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "safekeeper_client"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"reqwest",
|
||||
"safekeeper_api",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "same-file"
|
||||
version = "1.0.6"
|
||||
@@ -7203,6 +7234,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"backtrace",
|
||||
"bincode",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
@@ -7213,12 +7245,14 @@ dependencies = [
|
||||
"criterion",
|
||||
"diatomic-waker",
|
||||
"fail",
|
||||
"flate2",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"jemalloc_pprof",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
@@ -7575,7 +7609,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
|
||||
dependencies = [
|
||||
"windows-core",
|
||||
"windows-targets 0.52.4",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7584,7 +7618,7 @@ version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
|
||||
dependencies = [
|
||||
"windows-targets 0.52.4",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7602,7 +7636,7 @@ version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
|
||||
dependencies = [
|
||||
"windows-targets 0.52.4",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7622,17 +7656,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.52.4"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b"
|
||||
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm 0.52.4",
|
||||
"windows_aarch64_msvc 0.52.4",
|
||||
"windows_i686_gnu 0.52.4",
|
||||
"windows_i686_msvc 0.52.4",
|
||||
"windows_x86_64_gnu 0.52.4",
|
||||
"windows_x86_64_gnullvm 0.52.4",
|
||||
"windows_x86_64_msvc 0.52.4",
|
||||
"windows_aarch64_gnullvm 0.52.6",
|
||||
"windows_aarch64_msvc 0.52.6",
|
||||
"windows_i686_gnu 0.52.6",
|
||||
"windows_i686_gnullvm",
|
||||
"windows_i686_msvc 0.52.6",
|
||||
"windows_x86_64_gnu 0.52.6",
|
||||
"windows_x86_64_gnullvm 0.52.6",
|
||||
"windows_x86_64_msvc 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7643,9 +7678,9 @@ checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.52.4"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9"
|
||||
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
@@ -7655,9 +7690,9 @@ checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.52.4"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675"
|
||||
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
@@ -7667,9 +7702,15 @@ checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.52.4"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3"
|
||||
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnullvm"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
@@ -7679,9 +7720,9 @@ checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.52.4"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02"
|
||||
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
@@ -7691,9 +7732,9 @@ checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.52.4"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03"
|
||||
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
@@ -7703,9 +7744,9 @@ checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.52.4"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177"
|
||||
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
@@ -7715,9 +7756,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.52.4"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
|
||||
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
|
||||
|
||||
[[package]]
|
||||
name = "winnow"
|
||||
|
||||
@@ -11,6 +11,7 @@ members = [
|
||||
"pageserver/pagebench",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"safekeeper/client",
|
||||
"storage_broker",
|
||||
"storage_controller",
|
||||
"storage_controller/client",
|
||||
@@ -51,6 +52,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
backtrace = "0.3.74"
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
@@ -233,6 +235,7 @@ postgres_initdb = { path = "./libs/postgres_initdb" }
|
||||
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
|
||||
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
|
||||
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
|
||||
safekeeper_client = { path = "./safekeeper/client" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
|
||||
storage_controller_client = { path = "./storage_controller/client" }
|
||||
|
||||
@@ -35,10 +35,12 @@ RUN case $DEBIAN_VERSION in \
|
||||
;; \
|
||||
esac && \
|
||||
apt update && \
|
||||
apt install --no-install-recommends -y git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
apt install --no-install-recommends --no-install-suggests -y \
|
||||
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
|
||||
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd \
|
||||
$VERSION_INSTALLS
|
||||
$VERSION_INSTALLS \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -113,10 +115,12 @@ ARG DEBIAN_VERSION
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends -y gdal-bin libboost-dev libboost-thread-dev libboost-filesystem-dev \
|
||||
apt install --no-install-recommends --no-install-suggests -y \
|
||||
gdal-bin libboost-dev libboost-thread-dev libboost-filesystem-dev \
|
||||
libboost-system-dev libboost-iostreams-dev libboost-program-options-dev libboost-timer-dev \
|
||||
libcgal-dev libgdal-dev libgmp-dev libmpfr-dev libopenscenegraph-dev libprotobuf-c-dev \
|
||||
protobuf-c-compiler xsltproc
|
||||
protobuf-c-compiler xsltproc \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
# Postgis 3.5.0 requires SFCGAL 1.4+
|
||||
@@ -143,9 +147,9 @@ RUN case "${DEBIAN_VERSION}" in \
|
||||
wget https://gitlab.com/sfcgal/SFCGAL/-/archive/v${SFCGAL_VERSION}/SFCGAL-v${SFCGAL_VERSION}.tar.gz -O SFCGAL.tar.gz && \
|
||||
echo "${SFCGAL_CHECKSUM} SFCGAL.tar.gz" | sha256sum --check && \
|
||||
mkdir sfcgal-src && cd sfcgal-src && tar xzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release . && make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make clean && cp -R /sfcgal/* /
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -GNinja . && ninja -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/sfcgal ninja install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
ninja clean && cp -R /sfcgal/* /
|
||||
|
||||
ENV PATH="/usr/local/pgsql/bin:$PATH"
|
||||
|
||||
@@ -213,9 +217,9 @@ RUN case "${PG_VERSION}" in \
|
||||
echo "${PGROUTING_CHECKSUM} pgrouting.tar.gz" | sha256sum --check && \
|
||||
mkdir pgrouting-src && cd pgrouting-src && tar xzf ../pgrouting.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && cd build && \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release .. && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
cmake -GNinja -DCMAKE_BUILD_TYPE=Release .. && \
|
||||
ninja -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
ninja -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\
|
||||
cp /usr/local/pgsql/share/extension/pgrouting.control /extensions/postgis && \
|
||||
@@ -235,7 +239,9 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY compute/patches/plv8-3.1.10.patch /plv8-3.1.10.patch
|
||||
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends -y ninja-build python3-dev libncurses5 binutils clang
|
||||
apt install --no-install-recommends --no-install-suggests -y \
|
||||
ninja-build python3-dev libncurses5 binutils clang \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# plv8 3.2.3 supports v17
|
||||
# last release v3.2.3 - Sep 7, 2024
|
||||
@@ -301,9 +307,10 @@ RUN mkdir -p /h3/usr/ && \
|
||||
echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \
|
||||
mkdir h3-src && cd h3-src && tar xzf ../h3.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && cd build && \
|
||||
cmake .. -DCMAKE_BUILD_TYPE=Release && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/h3 make install && \
|
||||
cmake .. -GNinja -DBUILD_BENCHMARKS=0 -DCMAKE_BUILD_TYPE=Release \
|
||||
-DBUILD_FUZZERS=0 -DBUILD_FILTERS=0 -DBUILD_GENERATORS=0 -DBUILD_TESTING=0 \
|
||||
&& ninja -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/h3 ninja install && \
|
||||
cp -R /h3/usr / && \
|
||||
rm -rf build
|
||||
|
||||
@@ -650,14 +657,15 @@ FROM build-deps AS rdkit-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install --no-install-recommends -y \
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends --no-install-suggests -y \
|
||||
libboost-iostreams1.74-dev \
|
||||
libboost-regex1.74-dev \
|
||||
libboost-serialization1.74-dev \
|
||||
libboost-system1.74-dev \
|
||||
libeigen3-dev \
|
||||
libboost-all-dev
|
||||
libboost-all-dev \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# rdkit Release_2024_09_1 supports v17
|
||||
# last release Release_2024_09_1 - Sep 27, 2024
|
||||
@@ -693,6 +701,8 @@ RUN case "${PG_VERSION}" in \
|
||||
-D RDK_BUILD_MOLINTERCHANGE_SUPPORT=OFF \
|
||||
-D RDK_BUILD_YAEHMOP_SUPPORT=OFF \
|
||||
-D RDK_BUILD_STRUCTCHECKER_SUPPORT=OFF \
|
||||
-D RDK_TEST_MULTITHREADED=OFF \
|
||||
-D RDK_BUILD_CPP_TESTS=OFF \
|
||||
-D RDK_USE_URF=OFF \
|
||||
-D RDK_BUILD_PGSQL=ON \
|
||||
-D RDK_PGSQL_STATIC=ON \
|
||||
@@ -704,9 +714,10 @@ RUN case "${PG_VERSION}" in \
|
||||
-D RDK_INSTALL_COMIC_FONTS=OFF \
|
||||
-D RDK_BUILD_FREETYPE_SUPPORT=OFF \
|
||||
-D CMAKE_BUILD_TYPE=Release \
|
||||
-GNinja \
|
||||
. && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
ninja -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
ninja -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/rdkit.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -849,8 +860,9 @@ FROM build-deps AS rust-extensions-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install --no-install-recommends -y curl libclang-dev && \
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends --no-install-suggests -y curl libclang-dev && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* && \
|
||||
useradd -ms /bin/bash nonroot -b /home
|
||||
|
||||
ENV HOME=/home/nonroot
|
||||
@@ -885,8 +897,9 @@ FROM build-deps AS rust-extensions-build-pgrx12
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install --no-install-recommends -y curl libclang-dev && \
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends --no-install-suggests -y curl libclang-dev && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* && \
|
||||
useradd -ms /bin/bash nonroot -b /home
|
||||
|
||||
ENV HOME=/home/nonroot
|
||||
@@ -914,18 +927,22 @@ FROM rust-extensions-build-pgrx12 AS pg-onnx-build
|
||||
|
||||
# cmake 3.26 or higher is required, so installing it using pip (bullseye-backports has cmake 3.25).
|
||||
# Install it using virtual environment, because Python 3.11 (the default version on Debian 12 (Bookworm)) complains otherwise
|
||||
RUN apt-get update && apt-get install -y python3 python3-pip python3-venv && \
|
||||
RUN apt update && apt install --no-install-recommends --no-install-suggests -y \
|
||||
python3 python3-pip python3-venv && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* && \
|
||||
python3 -m venv venv && \
|
||||
. venv/bin/activate && \
|
||||
python3 -m pip install cmake==3.30.5 && \
|
||||
wget https://github.com/microsoft/onnxruntime/archive/refs/tags/v1.18.1.tar.gz -O onnxruntime.tar.gz && \
|
||||
mkdir onnxruntime-src && cd onnxruntime-src && tar xzf ../onnxruntime.tar.gz --strip-components=1 -C . && \
|
||||
./build.sh --config Release --parallel --skip_submodule_sync --skip_tests --allow_running_as_root
|
||||
./build.sh --config Release --parallel --cmake_generator Ninja \
|
||||
--skip_submodule_sync --skip_tests --allow_running_as_root
|
||||
|
||||
|
||||
FROM pg-onnx-build AS pgrag-pg-build
|
||||
|
||||
RUN apt-get install -y protobuf-compiler && \
|
||||
RUN apt update && apt install --no-install-recommends --no-install-suggests -y protobuf-compiler \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/* && \
|
||||
wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.0.0.tar.gz -O pgrag.tar.gz && \
|
||||
echo "2cbe394c1e74fc8bcad9b52d5fbbfb783aef834ca3ce44626cfd770573700bb4 pgrag.tar.gz" | sha256sum --check && \
|
||||
mkdir pgrag-src && cd pgrag-src && tar xzf ../pgrag.tar.gz --strip-components=1 -C . && \
|
||||
@@ -1168,6 +1185,25 @@ RUN case "${PG_VERSION}" in \
|
||||
make BUILD_TYPE=release -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_mooncake.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_repack"
|
||||
# compile pg_repack extension
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM build-deps AS pg-repack-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH="/usr/local/pgsql/bin/:$PATH"
|
||||
|
||||
RUN wget https://github.com/reorg/pg_repack/archive/refs/tags/ver_1.5.2.tar.gz -O pg_repack.tar.gz && \
|
||||
echo '4516cad42251ed3ad53ff619733004db47d5755acac83f75924cd94d1c4fb681 pg_repack.tar.gz' | sha256sum --check && \
|
||||
mkdir pg_repack-src && cd pg_repack-src && tar xzf ../pg_repack.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
@@ -1213,6 +1249,7 @@ COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
@@ -1279,8 +1316,8 @@ COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/fast_
|
||||
|
||||
FROM debian:$DEBIAN_FLAVOR AS pgbouncer
|
||||
RUN set -e \
|
||||
&& apt-get update \
|
||||
&& apt-get install --no-install-recommends -y \
|
||||
&& apt update \
|
||||
&& apt install --no-install-suggests --no-install-recommends -y \
|
||||
build-essential \
|
||||
git \
|
||||
ca-certificates \
|
||||
@@ -1288,7 +1325,8 @@ RUN set -e \
|
||||
automake \
|
||||
libevent-dev \
|
||||
libtool \
|
||||
pkg-config
|
||||
pkg-config \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
|
||||
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
|
||||
@@ -1519,7 +1557,7 @@ RUN apt update && \
|
||||
procps \
|
||||
ca-certificates \
|
||||
$VERSION_INSTALLS && \
|
||||
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
metrics: [
|
||||
import 'sql_exporter/checkpoints_req.libsonnet',
|
||||
import 'sql_exporter/checkpoints_timed.libsonnet',
|
||||
import 'sql_exporter/compute_backpressure_throttling_seconds.libsonnet',
|
||||
import 'sql_exporter/compute_backpressure_throttling_seconds_total.libsonnet',
|
||||
import 'sql_exporter/compute_current_lsn.libsonnet',
|
||||
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
|
||||
import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet',
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
{
|
||||
metric_name: 'compute_backpressure_throttling_seconds',
|
||||
type: 'gauge',
|
||||
metric_name: 'compute_backpressure_throttling_seconds_total',
|
||||
type: 'counter',
|
||||
help: 'Time compute has spent throttled',
|
||||
key_labels: null,
|
||||
values: [
|
||||
'throttled',
|
||||
],
|
||||
query: importstr 'sql_exporter/compute_backpressure_throttling_seconds.sql',
|
||||
query: importstr 'sql_exporter/compute_backpressure_throttling_seconds_total.sql',
|
||||
}
|
||||
@@ -132,11 +132,6 @@
|
||||
"name": "cron.database",
|
||||
"value": "postgres",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "session_preload_libraries",
|
||||
"value": "anon",
|
||||
"vartype": "string"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
@@ -35,11 +35,11 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
echo "clean up containers if exists"
|
||||
cleanup
|
||||
PG_TEST_VERSION=$((pg_version < 16 ? 16 : pg_version))
|
||||
# The support of pg_anon not yet added to PG17, so we have to remove the corresponding option
|
||||
if [ $pg_version -eq 17 ]; then
|
||||
# The support of pg_anon not yet added to PG17, so we have to add the corresponding option for other PG versions
|
||||
if [ "${pg_version}" -ne 17 ]; then
|
||||
SPEC_PATH="compute_wrapper/var/db/postgres/specs"
|
||||
mv $SPEC_PATH/spec.json $SPEC_PATH/spec.bak
|
||||
jq 'del(.cluster.settings[] | select (.name == "session_preload_libraries"))' $SPEC_PATH/spec.bak > $SPEC_PATH/spec.json
|
||||
jq '.cluster.settings += [{"name": "session_preload_libraries","value": "anon","vartype": "string"}]' "${SPEC_PATH}/spec.bak" > "${SPEC_PATH}/spec.json"
|
||||
fi
|
||||
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose --profile test-extensions -f $COMPOSE_FILE up --build -d
|
||||
|
||||
@@ -106,8 +106,8 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
fi
|
||||
fi
|
||||
cleanup
|
||||
# The support of pg_anon not yet added to PG17, so we have to remove the corresponding option
|
||||
if [ $pg_version -eq 17 ]; then
|
||||
mv $SPEC_PATH/spec.bak $SPEC_PATH/spec.json
|
||||
# Restore the original spec.json
|
||||
if [ "$pg_version" -ne 17 ]; then
|
||||
mv "$SPEC_PATH/spec.bak" "$SPEC_PATH/spec.json"
|
||||
fi
|
||||
done
|
||||
|
||||
@@ -91,7 +91,7 @@ impl Timing {
|
||||
|
||||
/// Return true if there is a ready event.
|
||||
fn is_event_ready(&self, queue: &mut BinaryHeap<Pending>) -> bool {
|
||||
queue.peek().map_or(false, |x| x.time <= self.now())
|
||||
queue.peek().is_some_and(|x| x.time <= self.now())
|
||||
}
|
||||
|
||||
/// Clear all pending events.
|
||||
|
||||
@@ -9,9 +9,11 @@ regex.workspace = true
|
||||
bytes.workspace = true
|
||||
anyhow.workspace = true
|
||||
crc32c.workspace = true
|
||||
criterion.workspace = true
|
||||
once_cell.workspace = true
|
||||
log.workspace = true
|
||||
memoffset.workspace = true
|
||||
pprof.workspace = true
|
||||
thiserror.workspace = true
|
||||
serde.workspace = true
|
||||
utils.workspace = true
|
||||
@@ -24,3 +26,7 @@ postgres.workspace = true
|
||||
[build-dependencies]
|
||||
anyhow.workspace = true
|
||||
bindgen.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "waldecoder"
|
||||
harness = false
|
||||
|
||||
26
libs/postgres_ffi/benches/README.md
Normal file
26
libs/postgres_ffi/benches/README.md
Normal file
@@ -0,0 +1,26 @@
|
||||
## Benchmarks
|
||||
|
||||
To run benchmarks:
|
||||
|
||||
```sh
|
||||
# All benchmarks.
|
||||
cargo bench --package postgres_ffi
|
||||
|
||||
# Specific file.
|
||||
cargo bench --package postgres_ffi --bench waldecoder
|
||||
|
||||
# Specific benchmark.
|
||||
cargo bench --package postgres_ffi --bench waldecoder complete_record/size=1024
|
||||
|
||||
# List available benchmarks.
|
||||
cargo bench --package postgres_ffi --benches -- --list
|
||||
|
||||
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
|
||||
# Output in target/criterion/*/profile/flamegraph.svg.
|
||||
cargo bench --package postgres_ffi --bench waldecoder complete_record/size=1024 -- --profile-time 10
|
||||
```
|
||||
|
||||
Additional charts and statistics are available in `target/criterion/report/index.html`.
|
||||
|
||||
Benchmarks are automatically compared against the previous run. To compare against other runs, see
|
||||
`--baseline` and `--save-baseline`.
|
||||
49
libs/postgres_ffi/benches/waldecoder.rs
Normal file
49
libs/postgres_ffi/benches/waldecoder.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use std::ffi::CStr;
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
|
||||
use postgres_ffi::v17::wal_generator::LogicalMessageGenerator;
|
||||
use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
const KB: usize = 1024;
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = bench_complete_record,
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
||||
/// Benchmarks WalStreamDecoder::complete_record() for a logical message of varying size.
|
||||
fn bench_complete_record(c: &mut Criterion) {
|
||||
let mut g = c.benchmark_group("complete_record");
|
||||
for size in [64, KB, 8 * KB, 128 * KB] {
|
||||
// Kind of weird to change the group throughput per benchmark, but it's the only way
|
||||
// to vary it per benchmark. It works.
|
||||
g.throughput(criterion::Throughput::Bytes(size as u64));
|
||||
g.bench_function(format!("size={size}"), |b| run_bench(b, size).unwrap());
|
||||
}
|
||||
|
||||
fn run_bench(b: &mut Bencher, size: usize) -> anyhow::Result<()> {
|
||||
const PREFIX: &CStr = c"";
|
||||
let value_size = LogicalMessageGenerator::make_value_size(size, PREFIX);
|
||||
let value = vec![1; value_size];
|
||||
|
||||
let mut decoder = WalStreamDecoder::new(Lsn(0), 170000);
|
||||
let msg = LogicalMessageGenerator::new(PREFIX, &value)
|
||||
.next()
|
||||
.unwrap()
|
||||
.encode(Lsn(0));
|
||||
assert_eq!(msg.len(), size);
|
||||
|
||||
b.iter(|| {
|
||||
let msg = msg.clone(); // Bytes::clone() is cheap
|
||||
decoder.complete_record(msg).unwrap();
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -231,6 +231,22 @@ impl LogicalMessageGenerator {
|
||||
};
|
||||
[&header.encode(), prefix, message].concat().into()
|
||||
}
|
||||
|
||||
/// Computes how large a value must be to get a record of the given size. Convenience method to
|
||||
/// construct records of pre-determined size. Panics if the record size is too small.
|
||||
pub fn make_value_size(record_size: usize, prefix: &CStr) -> usize {
|
||||
let xlog_header_size = XLOG_SIZE_OF_XLOG_RECORD;
|
||||
let lm_header_size = size_of::<XlLogicalMessage>();
|
||||
let prefix_size = prefix.to_bytes_with_nul().len();
|
||||
let data_header_size = match record_size - xlog_header_size - 2 {
|
||||
0..=255 => 2,
|
||||
256..=258 => panic!("impossible record_size {record_size}"),
|
||||
259.. => 5,
|
||||
};
|
||||
record_size
|
||||
.checked_sub(xlog_header_size + lm_header_size + prefix_size + data_header_size)
|
||||
.expect("record_size too small")
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for LogicalMessageGenerator {
|
||||
|
||||
@@ -81,7 +81,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
continue;
|
||||
}
|
||||
let mut f = File::options().write(true).open(file.path()).unwrap();
|
||||
const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
|
||||
static ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
|
||||
f.write_all(
|
||||
&ZEROS[0..min(
|
||||
WAL_SEGMENT_SIZE,
|
||||
|
||||
@@ -11,7 +11,7 @@ mod private {
|
||||
Query(&'a str),
|
||||
}
|
||||
|
||||
impl<'a> ToStatementType<'a> {
|
||||
impl ToStatementType<'_> {
|
||||
pub async fn into_statement(self, client: &Client) -> Result<Statement, Error> {
|
||||
match self {
|
||||
ToStatementType::Statement(s) => Ok(s.clone()),
|
||||
|
||||
@@ -18,6 +18,7 @@ camino = { workspace = true, features = ["serde1"] }
|
||||
humantime-serde.workspace = true
|
||||
hyper = { workspace = true, features = ["client"] }
|
||||
futures.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::io;
|
||||
use std::num::NonZeroU32;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
@@ -15,6 +16,8 @@ use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||
use azure_core::HttpClient;
|
||||
use azure_core::TransportOptions;
|
||||
use azure_core::{Continuable, RetryOptions};
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::blob::CopyStatus;
|
||||
@@ -80,8 +83,13 @@ impl AzureBlobStorage {
|
||||
StorageCredentials::token_credential(token_credential)
|
||||
};
|
||||
|
||||
// we have an outer retry
|
||||
let builder = ClientBuilder::new(account, credentials).retry(RetryOptions::none());
|
||||
let builder = ClientBuilder::new(account, credentials)
|
||||
// we have an outer retry
|
||||
.retry(RetryOptions::none())
|
||||
// Customize transport to configure conneciton pooling
|
||||
.transport(TransportOptions::new(Self::reqwest_client(
|
||||
azure_config.conn_pool_size,
|
||||
)));
|
||||
|
||||
let client = builder.container_client(azure_config.container_name.to_owned());
|
||||
|
||||
@@ -106,6 +114,14 @@ impl AzureBlobStorage {
|
||||
})
|
||||
}
|
||||
|
||||
fn reqwest_client(conn_pool_size: usize) -> Arc<dyn HttpClient> {
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.pool_max_idle_per_host(conn_pool_size)
|
||||
.build()
|
||||
.expect("failed to build `reqwest` client");
|
||||
Arc::new(client)
|
||||
}
|
||||
|
||||
pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
|
||||
assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
let path_string = path.get_path().as_str();
|
||||
@@ -544,9 +560,9 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete_objects<'a>(
|
||||
async fn delete_objects(
|
||||
&self,
|
||||
paths: &'a [RemotePath],
|
||||
paths: &[RemotePath],
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Delete;
|
||||
|
||||
@@ -114,6 +114,16 @@ fn default_max_keys_per_list_response() -> Option<i32> {
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
|
||||
}
|
||||
|
||||
fn default_azure_conn_pool_size() -> usize {
|
||||
// Conservative default: no connection pooling. At time of writing this is the Azure
|
||||
// SDK's default as well, due to historic reports of hard-to-reproduce issues
|
||||
// (https://github.com/hyperium/hyper/issues/2312)
|
||||
//
|
||||
// However, using connection pooling is important to avoid exhausting client ports when
|
||||
// doing huge numbers of requests (https://github.com/neondatabase/cloud/issues/20971)
|
||||
0
|
||||
}
|
||||
|
||||
impl Debug for S3Config {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("S3Config")
|
||||
@@ -146,6 +156,8 @@ pub struct AzureConfig {
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
#[serde(default = "default_azure_conn_pool_size")]
|
||||
pub conn_pool_size: usize,
|
||||
}
|
||||
|
||||
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
|
||||
@@ -302,6 +314,7 @@ timeout = '5s'";
|
||||
container_region = 'westeurope'
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
conn_pool_size = 8
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap();
|
||||
@@ -316,6 +329,7 @@ timeout = '5s'";
|
||||
prefix_in_container: None,
|
||||
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
conn_pool_size: 8,
|
||||
}),
|
||||
timeout: Duration::from_secs(7),
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
|
||||
@@ -341,9 +341,9 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
|
||||
/// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
|
||||
/// through.
|
||||
async fn delete_objects<'a>(
|
||||
async fn delete_objects(
|
||||
&self,
|
||||
paths: &'a [RemotePath],
|
||||
paths: &[RemotePath],
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
|
||||
@@ -562,9 +562,9 @@ impl RemoteStorage for LocalFs {
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_objects<'a>(
|
||||
async fn delete_objects(
|
||||
&self,
|
||||
paths: &'a [RemotePath],
|
||||
paths: &[RemotePath],
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
for path in paths {
|
||||
|
||||
@@ -813,9 +813,9 @@ impl RemoteStorage for S3Bucket {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete_objects<'a>(
|
||||
async fn delete_objects(
|
||||
&self,
|
||||
paths: &'a [RemotePath],
|
||||
paths: &[RemotePath],
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Delete;
|
||||
|
||||
@@ -181,9 +181,9 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.delete_inner(path, true, cancel).await
|
||||
}
|
||||
|
||||
async fn delete_objects<'a>(
|
||||
async fn delete_objects(
|
||||
&self,
|
||||
paths: &'a [RemotePath],
|
||||
paths: &[RemotePath],
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
|
||||
|
||||
@@ -218,6 +218,7 @@ async fn create_azure_client(
|
||||
prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
|
||||
concurrency_limit: NonZeroUsize::new(100).unwrap(),
|
||||
max_keys_per_list_response,
|
||||
conn_pool_size: 8,
|
||||
}),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
|
||||
@@ -15,17 +15,20 @@ arc-swap.workspace = true
|
||||
sentry.workspace = true
|
||||
async-compression.workspace = true
|
||||
anyhow.workspace = true
|
||||
backtrace.workspace = true
|
||||
bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
diatomic-waker.workspace = true
|
||||
flate2.workspace = true
|
||||
git-version.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
humantime.workspace = true
|
||||
hyper0 = { workspace = true, features = ["full"] }
|
||||
itertools.workspace = true
|
||||
fail.workspace = true
|
||||
futures = { workspace = true}
|
||||
futures = { workspace = true }
|
||||
jemalloc_pprof.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
nix.workspace = true
|
||||
|
||||
@@ -1,15 +1,22 @@
|
||||
use crate::auth::{AuthError, Claims, SwappableJwtAuth};
|
||||
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
|
||||
use crate::http::request::{get_query_param, parse_query_param};
|
||||
use crate::pprof;
|
||||
use ::pprof::protos::Message as _;
|
||||
use ::pprof::ProfilerGuardBuilder;
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Method;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
|
||||
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
@@ -18,11 +25,6 @@ use std::io::Write as _;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pprof::protos::Message as _;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"libmetrics_metric_handler_requests_total",
|
||||
@@ -365,7 +367,7 @@ pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, A
|
||||
|
||||
// Take the profile.
|
||||
let report = tokio::task::spawn_blocking(move || {
|
||||
let guard = pprof::ProfilerGuardBuilder::default()
|
||||
let guard = ProfilerGuardBuilder::default()
|
||||
.frequency(frequency_hz)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()?;
|
||||
@@ -457,10 +459,34 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
}
|
||||
|
||||
Format::Pprof => {
|
||||
let data = tokio::task::spawn_blocking(move || prof_ctl.dump_pprof())
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let data = tokio::task::spawn_blocking(move || {
|
||||
let bytes = prof_ctl.dump_pprof()?;
|
||||
// Symbolize the profile.
|
||||
// TODO: consider moving this upstream to jemalloc_pprof and avoiding the
|
||||
// serialization roundtrip.
|
||||
static STRIP_FUNCTIONS: Lazy<Vec<(Regex, bool)>> = Lazy::new(|| {
|
||||
// Functions to strip from profiles. If true, also remove child frames.
|
||||
vec![
|
||||
(Regex::new("^__rust").unwrap(), false),
|
||||
(Regex::new("^_start$").unwrap(), false),
|
||||
(Regex::new("^irallocx_prof").unwrap(), true),
|
||||
(Regex::new("^prof_alloc_prep").unwrap(), true),
|
||||
(Regex::new("^std::rt::lang_start").unwrap(), false),
|
||||
(Regex::new("^std::sys::backtrace::__rust").unwrap(), false),
|
||||
]
|
||||
});
|
||||
let profile = pprof::decode(&bytes)?;
|
||||
let profile = pprof::symbolize(profile)?;
|
||||
let profile = pprof::strip_locations(
|
||||
profile,
|
||||
&["libc", "libgcc", "pthread", "vdso"],
|
||||
&STRIP_FUNCTIONS,
|
||||
);
|
||||
pprof::encode(&profile)
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "application/octet-stream")
|
||||
|
||||
@@ -96,6 +96,8 @@ pub mod circuit_breaker;
|
||||
|
||||
pub mod try_rcu;
|
||||
|
||||
pub mod pprof;
|
||||
|
||||
// Re-export used in macro. Avoids adding git-version as dep in target crates.
|
||||
#[doc(hidden)]
|
||||
pub use git_version;
|
||||
|
||||
190
libs/utils/src/pprof.rs
Normal file
190
libs/utils/src/pprof.rs
Normal file
@@ -0,0 +1,190 @@
|
||||
use flate2::write::{GzDecoder, GzEncoder};
|
||||
use flate2::Compression;
|
||||
use itertools::Itertools as _;
|
||||
use once_cell::sync::Lazy;
|
||||
use pprof::protos::{Function, Line, Message as _, Profile};
|
||||
use regex::Regex;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::ffi::c_void;
|
||||
use std::io::Write as _;
|
||||
|
||||
/// Decodes a gzip-compressed Protobuf-encoded pprof profile.
|
||||
pub fn decode(bytes: &[u8]) -> anyhow::Result<Profile> {
|
||||
let mut gz = GzDecoder::new(Vec::new());
|
||||
gz.write_all(bytes)?;
|
||||
Ok(Profile::parse_from_bytes(&gz.finish()?)?)
|
||||
}
|
||||
|
||||
/// Encodes a pprof profile as gzip-compressed Protobuf.
|
||||
pub fn encode(profile: &Profile) -> anyhow::Result<Vec<u8>> {
|
||||
let mut gz = GzEncoder::new(Vec::new(), Compression::default());
|
||||
profile.write_to_writer(&mut gz)?;
|
||||
Ok(gz.finish()?)
|
||||
}
|
||||
|
||||
/// Symbolizes a pprof profile using the current binary.
|
||||
pub fn symbolize(mut profile: Profile) -> anyhow::Result<Profile> {
|
||||
if !profile.function.is_empty() {
|
||||
return Ok(profile); // already symbolized
|
||||
}
|
||||
|
||||
// Collect function names.
|
||||
let mut functions: HashMap<String, Function> = HashMap::new();
|
||||
let mut strings: HashMap<String, i64> = profile
|
||||
.string_table
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, s)| (s, i as i64))
|
||||
.collect();
|
||||
|
||||
// Helper to look up or register a string.
|
||||
let mut string_id = |s: &str| -> i64 {
|
||||
// Don't use .entry() to avoid unnecessary allocations.
|
||||
if let Some(id) = strings.get(s) {
|
||||
return *id;
|
||||
}
|
||||
let id = strings.len() as i64;
|
||||
strings.insert(s.to_string(), id);
|
||||
id
|
||||
};
|
||||
|
||||
for loc in &mut profile.location {
|
||||
if !loc.line.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Resolve the line and function for each location.
|
||||
backtrace::resolve(loc.address as *mut c_void, |symbol| {
|
||||
let Some(symname) = symbol.name() else {
|
||||
return;
|
||||
};
|
||||
let mut name = symname.to_string();
|
||||
|
||||
// Strip the Rust monomorphization suffix from the symbol name.
|
||||
static SUFFIX_REGEX: Lazy<Regex> =
|
||||
Lazy::new(|| Regex::new("::h[0-9a-f]{16}$").expect("invalid regex"));
|
||||
if let Some(m) = SUFFIX_REGEX.find(&name) {
|
||||
name.truncate(m.start());
|
||||
}
|
||||
|
||||
let function_id = match functions.get(&name) {
|
||||
Some(function) => function.id,
|
||||
None => {
|
||||
let id = functions.len() as u64 + 1;
|
||||
let system_name = String::from_utf8_lossy(symname.as_bytes());
|
||||
let filename = symbol
|
||||
.filename()
|
||||
.map(|path| path.to_string_lossy())
|
||||
.unwrap_or(Cow::Borrowed(""));
|
||||
let function = Function {
|
||||
id,
|
||||
name: string_id(&name),
|
||||
system_name: string_id(&system_name),
|
||||
filename: string_id(&filename),
|
||||
..Default::default()
|
||||
};
|
||||
functions.insert(name, function);
|
||||
id
|
||||
}
|
||||
};
|
||||
loc.line.push(Line {
|
||||
function_id,
|
||||
line: symbol.lineno().unwrap_or(0) as i64,
|
||||
..Default::default()
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Store the resolved functions, and mark the mapping as resolved.
|
||||
profile.function = functions.into_values().sorted_by_key(|f| f.id).collect();
|
||||
profile.string_table = strings
|
||||
.into_iter()
|
||||
.sorted_by_key(|(_, i)| *i)
|
||||
.map(|(s, _)| s)
|
||||
.collect();
|
||||
|
||||
for mapping in &mut profile.mapping {
|
||||
mapping.has_functions = true;
|
||||
mapping.has_filenames = true;
|
||||
}
|
||||
|
||||
Ok(profile)
|
||||
}
|
||||
|
||||
/// Strips locations (stack frames) matching the given mappings (substring) or function names
|
||||
/// (regex). The function bool specifies whether child frames should be stripped as well.
|
||||
///
|
||||
/// The string definitions are left behind in the profile for simplicity, to avoid rewriting all
|
||||
/// string references.
|
||||
pub fn strip_locations(
|
||||
mut profile: Profile,
|
||||
mappings: &[&str],
|
||||
functions: &[(Regex, bool)],
|
||||
) -> Profile {
|
||||
// Strip mappings.
|
||||
let mut strip_mappings: HashSet<u64> = HashSet::new();
|
||||
|
||||
profile.mapping.retain(|mapping| {
|
||||
let Some(name) = profile.string_table.get(mapping.filename as usize) else {
|
||||
return true;
|
||||
};
|
||||
if mappings.iter().any(|substr| name.contains(substr)) {
|
||||
strip_mappings.insert(mapping.id);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
// Strip functions.
|
||||
let mut strip_functions: HashMap<u64, bool> = HashMap::new();
|
||||
|
||||
profile.function.retain(|function| {
|
||||
let Some(name) = profile.string_table.get(function.name as usize) else {
|
||||
return true;
|
||||
};
|
||||
for (regex, strip_children) in functions {
|
||||
if regex.is_match(name) {
|
||||
strip_functions.insert(function.id, *strip_children);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
// Strip locations. The bool specifies whether child frames should be stripped too.
|
||||
let mut strip_locations: HashMap<u64, bool> = HashMap::new();
|
||||
|
||||
profile.location.retain(|location| {
|
||||
for line in &location.line {
|
||||
if let Some(strip_children) = strip_functions.get(&line.function_id) {
|
||||
strip_locations.insert(location.id, *strip_children);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if strip_mappings.contains(&location.mapping_id) {
|
||||
strip_locations.insert(location.id, false);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
// Strip sample locations.
|
||||
for sample in &mut profile.sample {
|
||||
// First, find the uppermost function with child removal and truncate the stack.
|
||||
if let Some(truncate) = sample
|
||||
.location_id
|
||||
.iter()
|
||||
.rposition(|id| strip_locations.get(id) == Some(&true))
|
||||
{
|
||||
sample.location_id.drain(..=truncate);
|
||||
}
|
||||
// Next, strip any individual frames without child removal.
|
||||
sample
|
||||
.location_id
|
||||
.retain(|id| !strip_locations.contains_key(id));
|
||||
}
|
||||
|
||||
profile
|
||||
}
|
||||
@@ -272,7 +272,7 @@ struct CompactionJob<E: CompactionJobExecutor> {
|
||||
completed: bool,
|
||||
}
|
||||
|
||||
impl<'a, E> LevelCompactionState<'a, E>
|
||||
impl<E> LevelCompactionState<'_, E>
|
||||
where
|
||||
E: CompactionJobExecutor,
|
||||
{
|
||||
|
||||
@@ -224,9 +224,8 @@ impl<L> Level<L> {
|
||||
}
|
||||
|
||||
// recalculate depth if this was the last event at this point
|
||||
let more_events_at_this_key = events_iter
|
||||
.peek()
|
||||
.map_or(false, |next_e| next_e.key == e.key);
|
||||
let more_events_at_this_key =
|
||||
events_iter.peek().is_some_and(|next_e| next_e.key == e.key);
|
||||
if !more_events_at_this_key {
|
||||
let mut active_depth = 0;
|
||||
for (_end_lsn, is_image, _idx) in active_set.iter().rev() {
|
||||
|
||||
@@ -148,7 +148,7 @@ pub trait CompactionDeltaLayer<E: CompactionJobExecutor + ?Sized>: CompactionLay
|
||||
Self: 'a;
|
||||
|
||||
/// Return all keys in this delta layer.
|
||||
fn load_keys<'a>(
|
||||
fn load_keys(
|
||||
&self,
|
||||
ctx: &E::RequestContext,
|
||||
) -> impl Future<Output = anyhow::Result<Vec<Self::DeltaEntry<'_>>>> + Send;
|
||||
|
||||
@@ -143,7 +143,7 @@ impl interface::CompactionLayer<Key> for Arc<MockDeltaLayer> {
|
||||
impl interface::CompactionDeltaLayer<MockTimeline> for Arc<MockDeltaLayer> {
|
||||
type DeltaEntry<'a> = MockRecord;
|
||||
|
||||
async fn load_keys<'a>(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
|
||||
async fn load_keys(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
|
||||
Ok(self.records.clone())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,7 +248,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, W> Basebackup<'a, W>
|
||||
impl<W> Basebackup<'_, W>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
|
||||
@@ -1242,7 +1242,7 @@ pub struct DatadirModification<'a> {
|
||||
pending_metadata_bytes: usize,
|
||||
}
|
||||
|
||||
impl<'a> DatadirModification<'a> {
|
||||
impl DatadirModification<'_> {
|
||||
// When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
|
||||
// contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
|
||||
// additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
|
||||
@@ -1263,7 +1263,7 @@ impl<'a> DatadirModification<'a> {
|
||||
pub(crate) fn has_dirty_data(&self) -> bool {
|
||||
self.pending_data_batch
|
||||
.as_ref()
|
||||
.map_or(false, |b| b.has_data())
|
||||
.is_some_and(|b| b.has_data())
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
@@ -2230,7 +2230,7 @@ impl<'a> DatadirModification<'a> {
|
||||
assert!(!self
|
||||
.pending_data_batch
|
||||
.as_ref()
|
||||
.map_or(false, |b| b.updates_key(&key)));
|
||||
.is_some_and(|b| b.updates_key(&key)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2299,7 +2299,7 @@ pub enum Version<'a> {
|
||||
Modified(&'a DatadirModification<'a>),
|
||||
}
|
||||
|
||||
impl<'a> Version<'a> {
|
||||
impl Version<'_> {
|
||||
async fn get(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
|
||||
@@ -35,7 +35,7 @@ pub struct CompressionInfo {
|
||||
pub compressed_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
impl BlockCursor<'_> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
&self,
|
||||
|
||||
@@ -89,7 +89,7 @@ pub(crate) enum BlockReaderRef<'a> {
|
||||
VirtualFile(&'a VirtualFile),
|
||||
}
|
||||
|
||||
impl<'a> BlockReaderRef<'a> {
|
||||
impl BlockReaderRef<'_> {
|
||||
#[inline(always)]
|
||||
async fn read_blk(
|
||||
&self,
|
||||
|
||||
@@ -532,7 +532,7 @@ pub struct DiskBtreeIterator<'a> {
|
||||
>,
|
||||
}
|
||||
|
||||
impl<'a> DiskBtreeIterator<'a> {
|
||||
impl DiskBtreeIterator<'_> {
|
||||
pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
|
||||
self.stream.next().await
|
||||
}
|
||||
|
||||
@@ -174,11 +174,11 @@ impl EphemeralFile {
|
||||
}
|
||||
|
||||
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
|
||||
&'b self,
|
||||
async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
|
||||
&self,
|
||||
start: u64,
|
||||
dst: tokio_epoll_uring::Slice<B>,
|
||||
ctx: &'a RequestContext,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
|
||||
let submitted_offset = self.buffered_writer.bytes_submitted();
|
||||
|
||||
|
||||
@@ -392,8 +392,8 @@ impl LayerMap {
|
||||
image_layer: Option<Arc<PersistentLayerDesc>>,
|
||||
end_lsn: Lsn,
|
||||
) -> Option<SearchResult> {
|
||||
assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
|
||||
assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
|
||||
assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta()));
|
||||
assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta()));
|
||||
|
||||
match (delta_layer, image_layer) {
|
||||
(None, None) => None,
|
||||
|
||||
@@ -749,7 +749,7 @@ impl RemoteTimelineClient {
|
||||
// ahead of what's _actually_ on the remote during index upload.
|
||||
upload_queue.dirty.metadata = metadata.clone();
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -770,7 +770,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
upload_queue.dirty.metadata.apply(update);
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -809,7 +809,7 @@ impl RemoteTimelineClient {
|
||||
if let Some(archived_at_set) = need_upload_scheduled {
|
||||
let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
|
||||
upload_queue.dirty.archived_at = intended_archived_at;
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
}
|
||||
|
||||
let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
|
||||
@@ -824,7 +824,7 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
upload_queue.dirty.import_pgdata = state;
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -843,17 +843,14 @@ impl RemoteTimelineClient {
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an index-file upload operation in the background (internal function)
|
||||
fn schedule_index_upload(
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
) -> Result<(), NotInitialized> {
|
||||
fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
|
||||
let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
|
||||
// fix up the duplicated field
|
||||
upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn;
|
||||
@@ -880,7 +877,6 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Launch the task immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reparent this timeline to a new parent.
|
||||
@@ -909,7 +905,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.dirty.metadata.reparent(new_parent);
|
||||
upload_queue.dirty.lineage.record_previous_ancestor(&prev);
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
@@ -948,7 +944,7 @@ impl RemoteTimelineClient {
|
||||
assert!(prev.is_none(), "copied layer existed already {layer}");
|
||||
}
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
@@ -1004,7 +1000,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.dirty.gc_blocking = current
|
||||
.map(|x| x.with_reason(reason))
|
||||
.or_else(|| Some(index::GcBlocking::started_now_for(reason)));
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
}
|
||||
@@ -1057,8 +1053,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.dirty.gc_blocking =
|
||||
current.as_ref().and_then(|x| x.without_reason(reason));
|
||||
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
|
||||
// FIXME: bogus ?
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
}
|
||||
@@ -1125,8 +1120,8 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
let with_metadata = self
|
||||
.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned())?;
|
||||
let with_metadata =
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
|
||||
|
||||
self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
|
||||
|
||||
@@ -1153,7 +1148,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
let names = gc_layers.iter().map(|x| x.layer_desc().layer_name());
|
||||
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?;
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
|
||||
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
|
||||
@@ -1166,7 +1161,7 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
names: I,
|
||||
) -> Result<Vec<(LayerName, LayerFileMetadata)>, NotInitialized>
|
||||
) -> Vec<(LayerName, LayerFileMetadata)>
|
||||
where
|
||||
I: IntoIterator<Item = LayerName>,
|
||||
{
|
||||
@@ -1208,10 +1203,10 @@ impl RemoteTimelineClient {
|
||||
// index_part update, because that needs to be uploaded before we can actually delete the
|
||||
// files.
|
||||
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
self.schedule_index_upload(upload_queue);
|
||||
}
|
||||
|
||||
Ok(with_metadata)
|
||||
with_metadata
|
||||
}
|
||||
|
||||
/// Schedules deletion for layer files which have previously been unlinked from the
|
||||
@@ -1302,7 +1297,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
|
||||
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names)?;
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -145,8 +145,8 @@ pub async fn download_layer_file<'a>(
|
||||
///
|
||||
/// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
|
||||
/// The unlinking has _not_ been made durable.
|
||||
async fn download_object<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
async fn download_object(
|
||||
storage: &GenericRemoteStorage,
|
||||
src_path: &RemotePath,
|
||||
dst_path: &Utf8PathBuf,
|
||||
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
|
||||
|
||||
@@ -25,8 +25,8 @@ use utils::id::{TenantId, TimelineId};
|
||||
use tracing::info;
|
||||
|
||||
/// Serializes and uploads the given index part data to the remote storage.
|
||||
pub(crate) async fn upload_index_part<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
pub(crate) async fn upload_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
|
||||
@@ -345,10 +345,7 @@ impl LayerFringe {
|
||||
}
|
||||
|
||||
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
|
||||
let read_desc = match self.planned_visits_by_lsn.pop() {
|
||||
Some(desc) => desc,
|
||||
None => return None,
|
||||
};
|
||||
let read_desc = self.planned_visits_by_lsn.pop()?;
|
||||
|
||||
let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
|
||||
|
||||
|
||||
@@ -1486,7 +1486,7 @@ pub struct ValueRef<'a> {
|
||||
layer: &'a DeltaLayerInner,
|
||||
}
|
||||
|
||||
impl<'a> ValueRef<'a> {
|
||||
impl ValueRef<'_> {
|
||||
/// Loads the value from disk
|
||||
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
|
||||
let buf = self.load_raw(ctx).await?;
|
||||
@@ -1543,7 +1543,7 @@ pub struct DeltaLayerIterator<'a> {
|
||||
is_end: bool,
|
||||
}
|
||||
|
||||
impl<'a> DeltaLayerIterator<'a> {
|
||||
impl DeltaLayerIterator<'_> {
|
||||
pub(crate) fn layer_dbg_info(&self) -> String {
|
||||
self.delta_layer.layer_dbg_info()
|
||||
}
|
||||
|
||||
@@ -1052,7 +1052,7 @@ pub struct ImageLayerIterator<'a> {
|
||||
is_end: bool,
|
||||
}
|
||||
|
||||
impl<'a> ImageLayerIterator<'a> {
|
||||
impl ImageLayerIterator<'_> {
|
||||
pub(crate) fn layer_dbg_info(&self) -> String {
|
||||
self.image_layer.layer_dbg_info()
|
||||
}
|
||||
|
||||
@@ -25,11 +25,11 @@ pub trait File: Send {
|
||||
/// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`.
|
||||
///
|
||||
/// No guarantees are made about the remaining bytes in `dst` in case of a short read.
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
|
||||
&'b self,
|
||||
async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
|
||||
&self,
|
||||
start: u64,
|
||||
dst: Slice<B>,
|
||||
ctx: &'a RequestContext,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(Slice<B>, usize)>;
|
||||
}
|
||||
|
||||
@@ -479,11 +479,11 @@ mod tests {
|
||||
}
|
||||
|
||||
impl File for InMemoryFile {
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
|
||||
&'b self,
|
||||
async fn read_exact_at_eof_ok<B: IoBufMut + Send>(
|
||||
&self,
|
||||
start: u64,
|
||||
mut dst: Slice<B>,
|
||||
_ctx: &'a RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> std::io::Result<(Slice<B>, usize)> {
|
||||
let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed();
|
||||
let nread = {
|
||||
@@ -609,12 +609,12 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'x> File for RecorderFile<'x> {
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
|
||||
&'b self,
|
||||
impl File for RecorderFile<'_> {
|
||||
async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
|
||||
&self,
|
||||
start: u64,
|
||||
dst: Slice<B>,
|
||||
ctx: &'a RequestContext,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(Slice<B>, usize)> {
|
||||
let (dst, nread) = self.file.read_exact_at_eof_ok(start, dst, ctx).await?;
|
||||
self.recorded.borrow_mut().push(RecordedRead {
|
||||
@@ -740,11 +740,11 @@ mod tests {
|
||||
}
|
||||
|
||||
impl File for MockFile {
|
||||
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>(
|
||||
&'b self,
|
||||
async fn read_exact_at_eof_ok<B: IoBufMut + Send>(
|
||||
&self,
|
||||
start: u64,
|
||||
mut dst: Slice<B>,
|
||||
_ctx: &'a RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> std::io::Result<(Slice<B>, usize)> {
|
||||
let ExpectedRead {
|
||||
expect_pos,
|
||||
|
||||
@@ -5842,7 +5842,7 @@ enum OpenLayerAction {
|
||||
None,
|
||||
}
|
||||
|
||||
impl<'a> TimelineWriter<'a> {
|
||||
impl TimelineWriter<'_> {
|
||||
async fn handle_open_layer_action(
|
||||
&mut self,
|
||||
at: Lsn,
|
||||
|
||||
@@ -1110,7 +1110,7 @@ impl Timeline {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
|
||||
let same_key = prev_key == Some(key);
|
||||
// We need to check key boundaries once we reach next key or end of layer with the same key
|
||||
if !same_key || lsn == dup_end_lsn {
|
||||
let mut next_key_size = 0u64;
|
||||
@@ -2904,7 +2904,7 @@ impl CompactionLayer<Key> for ResidentDeltaLayer {
|
||||
impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
|
||||
type DeltaEntry<'a> = DeltaEntry<'a>;
|
||||
|
||||
async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
|
||||
async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
|
||||
self.0.get_as_delta(ctx).await?.index_entries(ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +187,6 @@ async fn authenticate(
|
||||
NodeInfo {
|
||||
config,
|
||||
aux: db_info.aux,
|
||||
allow_self_signed_compute: false, // caller may override
|
||||
},
|
||||
db_info.allowed_ips,
|
||||
))
|
||||
|
||||
@@ -776,6 +776,7 @@ impl From<&jose_jwk::Key> for KeyType {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::future::IntoFuture;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
@@ -37,7 +37,6 @@ impl LocalBackend {
|
||||
branch_id: BranchIdTag::get_interner().get_or_intern("local"),
|
||||
cold_start_info: ColdStartInfo::WarmCached,
|
||||
},
|
||||
allow_self_signed_compute: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,6 +463,8 @@ impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#![allow(clippy::unimplemented, clippy::unwrap_used)]
|
||||
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -250,6 +250,7 @@ fn project_name_valid(name: &str) -> bool {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use serde_json::json;
|
||||
use ComputeUserInfoParseError::*;
|
||||
|
||||
@@ -229,7 +229,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
let (raw, read_buf) = stream.into_inner();
|
||||
// TODO: Normally, client doesn't send any data before
|
||||
// server says TLS handshake is ok and read_buf is empy.
|
||||
// server says TLS handshake is ok and read_buf is empty.
|
||||
// However, you could imagine pipelining of postgres
|
||||
// SSLRequest + TLS ClientHello in one hunk similar to
|
||||
// pipelining in our node js driver. We should probably
|
||||
|
||||
@@ -105,6 +105,9 @@ struct ProxyCliArgs {
|
||||
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
|
||||
#[clap(short = 'c', long, alias = "ssl-cert")]
|
||||
tls_cert: Option<String>,
|
||||
/// Allow writing TLS session keys to the given file pointed to by the environment variable `SSLKEYLOGFILE`.
|
||||
#[clap(long, alias = "allow-ssl-keylogfile")]
|
||||
allow_tls_keylogfile: bool,
|
||||
/// path to directory with TLS certificates for client postgres connections
|
||||
#[clap(long)]
|
||||
certs_dir: Option<String>,
|
||||
@@ -555,6 +558,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
key_path,
|
||||
cert_path,
|
||||
args.certs_dir.as_ref(),
|
||||
args.allow_tls_keylogfile,
|
||||
)?),
|
||||
(None, None) => None,
|
||||
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
|
||||
|
||||
4
proxy/src/cache/endpoints.rs
vendored
4
proxy/src/cache/endpoints.rs
vendored
@@ -12,6 +12,7 @@ use tracing::info;
|
||||
|
||||
use crate::config::EndpointCacheConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::ext::LockExt;
|
||||
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
|
||||
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
|
||||
use crate::rate_limiter::GlobalRateLimiter;
|
||||
@@ -96,7 +97,7 @@ impl EndpointsCache {
|
||||
|
||||
// If the limiter allows, we can pretend like it's valid
|
||||
// (incase it is, due to redis channel lag).
|
||||
if self.limiter.lock().unwrap().check() {
|
||||
if self.limiter.lock_propagate_poison().check() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -258,6 +259,7 @@ impl EndpointsCache {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
1
proxy/src/cache/project_info.rs
vendored
1
proxy/src/cache/project_info.rs
vendored
@@ -365,6 +365,7 @@ impl Cache for ProjectInfoCacheImpl {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::scram::ServerSecret;
|
||||
|
||||
@@ -3,8 +3,10 @@ use std::sync::Arc;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
use postgres_client::{CancelToken, NoTls};
|
||||
use once_cell::sync::OnceCell;
|
||||
use postgres_client::{tls::MakeTlsConnect, CancelToken};
|
||||
use pq_proto::CancelKeyData;
|
||||
use rustls::crypto::ring;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -13,12 +15,16 @@ use uuid::Uuid;
|
||||
|
||||
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
|
||||
use crate::error::ReportableError;
|
||||
use crate::ext::LockExt;
|
||||
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
|
||||
use crate::rate_limiter::LeakyBucketRateLimiter;
|
||||
use crate::redis::cancellation_publisher::{
|
||||
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
|
||||
};
|
||||
|
||||
use crate::compute::{load_certs, AcceptEverythingVerifier};
|
||||
use crate::postgres_rustls::MakeRustlsConnect;
|
||||
|
||||
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
|
||||
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
|
||||
pub(crate) type CancellationHandlerMainInternal = Option<Arc<Mutex<RedisPublisherClient>>>;
|
||||
@@ -114,7 +120,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here
|
||||
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
|
||||
};
|
||||
if !self.limiter.lock().unwrap().check(subnet_key, 1) {
|
||||
if !self.limiter.lock_propagate_poison().check(subnet_key, 1) {
|
||||
// log only the subnet part of the IP address to know which subnet is rate limited
|
||||
tracing::warn!("Rate limit exceeded. Skipping cancellation message, {subnet_key}");
|
||||
Metrics::get()
|
||||
@@ -173,7 +179,10 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
source: self.from,
|
||||
kind: crate::metrics::CancellationOutcome::Found,
|
||||
});
|
||||
info!("cancelling query per user's request using key {key}");
|
||||
info!(
|
||||
"cancelling query per user's request using key {key}, hostname {}, address: {}",
|
||||
cancel_closure.hostname, cancel_closure.socket_addr
|
||||
);
|
||||
cancel_closure.try_cancel_query().await
|
||||
}
|
||||
|
||||
@@ -220,6 +229,8 @@ impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
|
||||
}
|
||||
}
|
||||
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
/// This should've been a [`std::future::Future`], but
|
||||
/// it's impossible to name a type of an unboxed future
|
||||
/// (we'd need something like `#![feature(type_alias_impl_trait)]`).
|
||||
@@ -228,6 +239,8 @@ pub struct CancelClosure {
|
||||
socket_addr: SocketAddr,
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
hostname: String, // for pg_sni router
|
||||
allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
impl CancelClosure {
|
||||
@@ -235,17 +248,60 @@ impl CancelClosure {
|
||||
socket_addr: SocketAddr,
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
hostname: String,
|
||||
allow_self_signed_compute: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
socket_addr,
|
||||
cancel_token,
|
||||
ip_allowlist,
|
||||
hostname,
|
||||
allow_self_signed_compute,
|
||||
}
|
||||
}
|
||||
/// Cancels the query running on user's compute node.
|
||||
pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> {
|
||||
let socket = TcpStream::connect(self.socket_addr).await?;
|
||||
self.cancel_token.cancel_query_raw(socket, NoTls).await?;
|
||||
|
||||
let client_config = if self.allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection. Used only for tests
|
||||
let verifier = Arc::new(AcceptEverythingVerifier);
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
let root_store = TLS_ROOTS
|
||||
.get_or_try_init(load_certs)
|
||||
.map_err(|_e| {
|
||||
CancelError::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"TLS root store initialization failed".to_string(),
|
||||
))
|
||||
})?
|
||||
.clone();
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.with_root_certificates(root_store)
|
||||
};
|
||||
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
&mut mk_tls,
|
||||
&self.hostname,
|
||||
)
|
||||
.map_err(|e| {
|
||||
CancelError::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
e.to_string(),
|
||||
))
|
||||
})?;
|
||||
|
||||
self.cancel_token.cancel_query_raw(socket, tls).await?;
|
||||
debug!("query was cancelled");
|
||||
Ok(())
|
||||
}
|
||||
@@ -283,6 +339,7 @@ impl<P> Drop for Session<P> {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -319,6 +319,8 @@ impl ConnCfg {
|
||||
secret_key,
|
||||
},
|
||||
vec![],
|
||||
host.to_string(),
|
||||
allow_self_signed_compute,
|
||||
);
|
||||
|
||||
let connection = PostgresConnection {
|
||||
@@ -350,7 +352,7 @@ fn filtered_options(options: &str) -> Option<String> {
|
||||
Some(options)
|
||||
}
|
||||
|
||||
fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::Error>> {
|
||||
pub(crate) fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::Error>> {
|
||||
let der_certs = rustls_native_certs::load_native_certs();
|
||||
|
||||
if !der_certs.errors.is_empty() {
|
||||
@@ -364,7 +366,7 @@ fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::E
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AcceptEverythingVerifier;
|
||||
pub(crate) struct AcceptEverythingVerifier;
|
||||
impl ServerCertVerifier for AcceptEverythingVerifier {
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
use rustls::SignatureScheme;
|
||||
|
||||
@@ -95,6 +95,7 @@ pub fn configure_tls(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
certs_dir: Option<&String>,
|
||||
allow_tls_keylogfile: bool,
|
||||
) -> anyhow::Result<TlsConfig> {
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
@@ -135,6 +136,11 @@ pub fn configure_tls(
|
||||
|
||||
config.alpn_protocols = vec![PG_ALPN_PROTOCOL.to_vec()];
|
||||
|
||||
if allow_tls_keylogfile {
|
||||
// KeyLogFile will check for the SSLKEYLOGFILE environment variable.
|
||||
config.key_log = Arc::new(rustls::KeyLogFile::new());
|
||||
}
|
||||
|
||||
Ok(TlsConfig {
|
||||
config: Arc::new(config),
|
||||
common_names,
|
||||
@@ -221,15 +227,10 @@ impl CertResolver {
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec();
|
||||
|
||||
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
|
||||
PrivateKeyDer::Pkcs8(
|
||||
keys.pop()
|
||||
.unwrap()
|
||||
.context(format!("Failed to parse TLS keys at '{key_path}'"))?,
|
||||
)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
|
||||
@@ -213,9 +213,9 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
params_compat: true,
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
allow_self_signed_compute: config.allow_self_signed_compute,
|
||||
},
|
||||
&user_info,
|
||||
config.allow_self_signed_compute,
|
||||
config.wake_compute_retry_config,
|
||||
config.connect_to_compute_retry_config,
|
||||
)
|
||||
|
||||
@@ -23,6 +23,7 @@ use utils::backoff;
|
||||
use super::{RequestContextInner, LOG_CHAN};
|
||||
use crate::config::remote_storage_from_toml;
|
||||
use crate::context::LOG_CHAN_DISCONNECT;
|
||||
use crate::ext::TaskExt;
|
||||
|
||||
#[derive(clap::Args, Clone, Debug)]
|
||||
pub struct ParquetUploadArgs {
|
||||
@@ -171,7 +172,9 @@ pub async fn worker(
|
||||
};
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
LOG_CHAN.set(tx.downgrade()).unwrap();
|
||||
LOG_CHAN
|
||||
.set(tx.downgrade())
|
||||
.expect("only one worker should set the channel");
|
||||
|
||||
// setup row stream that will close on cancellation
|
||||
let cancellation_token2 = cancellation_token.clone();
|
||||
@@ -207,7 +210,9 @@ pub async fn worker(
|
||||
config.parquet_upload_disconnect_events_remote_storage
|
||||
{
|
||||
let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
|
||||
LOG_CHAN_DISCONNECT.set(tx_disconnect.downgrade()).unwrap();
|
||||
LOG_CHAN_DISCONNECT
|
||||
.set(tx_disconnect.downgrade())
|
||||
.expect("only one worker should set the channel");
|
||||
|
||||
// setup row stream that will close on cancellation
|
||||
tokio::spawn(async move {
|
||||
@@ -326,7 +331,7 @@ where
|
||||
Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
.propagate_task_panic()?;
|
||||
|
||||
rows.clear();
|
||||
Ok((rows, w, rg_meta))
|
||||
@@ -352,7 +357,7 @@ async fn upload_parquet(
|
||||
Ok((buffer, metadata))
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
.propagate_task_panic()?;
|
||||
|
||||
let data = buffer.split().freeze();
|
||||
|
||||
@@ -409,6 +414,7 @@ async fn upload_parquet(
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::net::Ipv4Addr;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
@@ -250,7 +250,6 @@ impl NeonControlPlaneClient {
|
||||
let node = NodeInfo {
|
||||
config,
|
||||
aux: body.aux,
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
|
||||
Ok(node)
|
||||
|
||||
@@ -102,7 +102,9 @@ impl MockControlPlane {
|
||||
Some(s) => {
|
||||
info!("got allowed_ips: {s}");
|
||||
s.split(',')
|
||||
.map(|s| IpPattern::from_str(s).unwrap())
|
||||
.map(|s| {
|
||||
IpPattern::from_str(s).expect("mocked ip pattern should be correct")
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
None => vec![],
|
||||
@@ -174,7 +176,6 @@ impl MockControlPlane {
|
||||
branch_id: (&BranchId::from("branch")).into(),
|
||||
cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm,
|
||||
},
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
|
||||
Ok(node)
|
||||
|
||||
@@ -67,28 +67,21 @@ pub(crate) struct NodeInfo {
|
||||
|
||||
/// Labels for proxy's metrics.
|
||||
pub(crate) aux: MetricsAuxInfo,
|
||||
|
||||
/// Whether we should accept self-signed certificates (for testing)
|
||||
pub(crate) allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
impl NodeInfo {
|
||||
pub(crate) async fn connect(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
allow_self_signed_compute: bool,
|
||||
timeout: Duration,
|
||||
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
|
||||
self.config
|
||||
.connect(
|
||||
ctx,
|
||||
self.allow_self_signed_compute,
|
||||
self.aux.clone(),
|
||||
timeout,
|
||||
)
|
||||
.connect(ctx, allow_self_signed_compute, self.aux.clone(), timeout)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) fn reuse_settings(&mut self, other: Self) {
|
||||
self.allow_self_signed_compute = other.allow_self_signed_compute;
|
||||
self.config.reuse_password(other.config);
|
||||
}
|
||||
|
||||
|
||||
41
proxy/src/ext.rs
Normal file
41
proxy/src/ext.rs
Normal file
@@ -0,0 +1,41 @@
|
||||
use std::panic::resume_unwind;
|
||||
use std::sync::{Mutex, MutexGuard};
|
||||
|
||||
use tokio::task::JoinError;
|
||||
|
||||
pub(crate) trait LockExt<T> {
|
||||
fn lock_propagate_poison(&self) -> MutexGuard<'_, T>;
|
||||
}
|
||||
|
||||
impl<T> LockExt<T> for Mutex<T> {
|
||||
/// Lock the mutex and panic if the mutex was poisoned.
|
||||
#[track_caller]
|
||||
fn lock_propagate_poison(&self) -> MutexGuard<'_, T> {
|
||||
match self.lock() {
|
||||
Ok(guard) => guard,
|
||||
// poison occurs when another thread panicked while holding the lock guard.
|
||||
// since panicking is often unrecoverable, propagating the poison panic is reasonable.
|
||||
Err(poison) => panic!("{poison}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait TaskExt<T> {
|
||||
fn propagate_task_panic(self) -> T;
|
||||
}
|
||||
|
||||
impl<T> TaskExt<T> for Result<T, JoinError> {
|
||||
/// Unwrap the result and panic if the inner task panicked.
|
||||
/// Also panics if the task was cancelled
|
||||
#[track_caller]
|
||||
fn propagate_task_panic(self) -> T {
|
||||
match self {
|
||||
Ok(t) => t,
|
||||
// Using resume_unwind prevents the panic hook being called twice.
|
||||
// Since we use this for structured concurrency, there is only
|
||||
// 1 logical panic, so this is more correct.
|
||||
Err(e) if e.is_panic() => resume_unwind(e.into_panic()),
|
||||
Err(e) => panic!("unexpected task error: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@ use utils::http::error::ApiError;
|
||||
use utils::http::json::json_response;
|
||||
use utils::http::{RouterBuilder, RouterService};
|
||||
|
||||
use crate::ext::{LockExt, TaskExt};
|
||||
use crate::jemalloc;
|
||||
|
||||
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -76,7 +77,7 @@ async fn prometheus_metrics_handler(
|
||||
let body = tokio::task::spawn_blocking(move || {
|
||||
let _span = span.entered();
|
||||
|
||||
let mut state = state.lock().unwrap();
|
||||
let mut state = state.lock_propagate_poison();
|
||||
let PrometheusHandler { encoder, metrics } = &mut *state;
|
||||
|
||||
metrics
|
||||
@@ -94,13 +95,13 @@ async fn prometheus_metrics_handler(
|
||||
body
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
.propagate_task_panic();
|
||||
|
||||
let response = Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "text/plain; version=0.0.4")
|
||||
.body(Body::from(body))
|
||||
.unwrap();
|
||||
.expect("response headers should be valid");
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ impl<Id: InternId> StringInterner<Id> {
|
||||
pub(crate) fn new() -> Self {
|
||||
StringInterner {
|
||||
inner: ThreadedRodeo::with_capacity_memory_limits_and_hasher(
|
||||
Capacity::new(2500, NonZeroUsize::new(1 << 16).unwrap()),
|
||||
Capacity::new(2500, NonZeroUsize::new(1 << 16).expect("value is nonzero")),
|
||||
// unbounded
|
||||
MemoryLimits::for_memory_usage(usize::MAX),
|
||||
BuildHasherDefault::<FxHasher>::default(),
|
||||
@@ -207,6 +207,7 @@ impl From<ProjectId> for ProjectIdInt {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::sync::OnceLock;
|
||||
|
||||
|
||||
@@ -22,8 +22,8 @@
|
||||
clippy::string_add,
|
||||
clippy::string_to_string,
|
||||
clippy::todo,
|
||||
// TODO: consider clippy::unimplemented
|
||||
// TODO: consider clippy::unwrap_used
|
||||
clippy::unimplemented,
|
||||
clippy::unwrap_used,
|
||||
)]
|
||||
// List of permanently allowed lints.
|
||||
#![allow(
|
||||
@@ -82,6 +82,7 @@ pub mod console_redirect_proxy;
|
||||
pub mod context;
|
||||
pub mod control_plane;
|
||||
pub mod error;
|
||||
mod ext;
|
||||
pub mod http;
|
||||
pub mod intern;
|
||||
pub mod jemalloc;
|
||||
|
||||
@@ -18,8 +18,16 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
let env_filter = EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy()
|
||||
.add_directive("aws_config=info".parse().unwrap())
|
||||
.add_directive("azure_core::policies::transport=off".parse().unwrap());
|
||||
.add_directive(
|
||||
"aws_config=info"
|
||||
.parse()
|
||||
.expect("this should be a valid filter directive"),
|
||||
)
|
||||
.add_directive(
|
||||
"azure_core::policies::transport=off"
|
||||
.parse()
|
||||
.expect("this should be a valid filter directive"),
|
||||
);
|
||||
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
|
||||
@@ -8,14 +8,6 @@ pub(crate) fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
|
||||
Some((cstr, other))
|
||||
}
|
||||
|
||||
/// See <https://doc.rust-lang.org/std/primitive.slice.html#method.split_array_ref>.
|
||||
pub(crate) fn split_at_const<const N: usize>(bytes: &[u8]) -> Option<(&[u8; N], &[u8])> {
|
||||
(bytes.len() >= N).then(|| {
|
||||
let (head, tail) = bytes.split_at(N);
|
||||
(head.try_into().unwrap(), tail)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -33,11 +25,4 @@ mod tests {
|
||||
assert_eq!(cstr.to_bytes(), b"foo");
|
||||
assert_eq!(rest, b"bar");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_split_at_const() {
|
||||
assert!(split_at_const::<0>(b"").is_some());
|
||||
assert!(split_at_const::<1>(b"").is_none());
|
||||
assert!(matches!(split_at_const::<1>(b"ok"), Some((b"o", b"k"))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -396,6 +396,7 @@ impl NetworkEndianIpv6 {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
|
||||
@@ -73,6 +73,9 @@ pub(crate) struct TcpMechanism<'a> {
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
pub(crate) locks: &'static ApiLocks<Host>,
|
||||
|
||||
/// Whether we should accept self-signed certificates (for testing)
|
||||
pub(crate) allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -90,7 +93,11 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
let host = node_info.config.get_host();
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
permit.release_result(node_info.connect(ctx, timeout).await)
|
||||
permit.release_result(
|
||||
node_info
|
||||
.connect(ctx, self.allow_self_signed_compute, timeout)
|
||||
.await,
|
||||
)
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
@@ -104,7 +111,6 @@ pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBac
|
||||
ctx: &RequestContext,
|
||||
mechanism: &M,
|
||||
user_info: &B,
|
||||
allow_self_signed_compute: bool,
|
||||
wake_compute_retry_config: RetryConfig,
|
||||
connect_to_compute_retry_config: RetryConfig,
|
||||
) -> Result<M::Connection, M::Error>
|
||||
@@ -117,7 +123,6 @@ where
|
||||
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
|
||||
|
||||
node_info.set_keys(user_info.get_keys());
|
||||
node_info.allow_self_signed_compute = allow_self_signed_compute;
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
|
||||
// try once
|
||||
|
||||
@@ -257,6 +257,7 @@ impl CopyBuffer {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
|
||||
@@ -191,13 +191,6 @@ impl ClientMode {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn allow_self_signed_compute(&self, config: &ProxyConfig) -> bool {
|
||||
match self {
|
||||
ClientMode::Tcp => config.allow_self_signed_compute,
|
||||
ClientMode::Websockets { .. } => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn hostname<'a, S>(&'a self, s: &'a Stream<S>) -> Option<&'a str> {
|
||||
match self {
|
||||
ClientMode::Tcp => s.sni_hostname(),
|
||||
@@ -355,9 +348,10 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
params_compat,
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
// only used for console redirect testing.
|
||||
allow_self_signed_compute: false,
|
||||
},
|
||||
&user_info,
|
||||
mode.allow_self_signed_compute(config),
|
||||
config.wake_compute_retry_config,
|
||||
config.connect_to_compute_retry_config,
|
||||
)
|
||||
@@ -494,7 +488,7 @@ impl NeonOptions {
|
||||
|
||||
pub(crate) fn neon_option(bytes: &str) -> Option<(&str, &str)> {
|
||||
static RE: OnceCell<Regex> = OnceCell::new();
|
||||
let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").unwrap());
|
||||
let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").expect("regex should be correct"));
|
||||
|
||||
let cap = re.captures(bytes)?;
|
||||
let (_, [k, v]) = cap.extract();
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
#![allow(clippy::unimplemented, clippy::unwrap_used)]
|
||||
|
||||
mod mitm;
|
||||
|
||||
@@ -553,7 +554,6 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
|
||||
branch_id: (&BranchId::from("branch")).into(),
|
||||
cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm,
|
||||
},
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone()));
|
||||
node2.map(|()| node)
|
||||
@@ -588,7 +588,7 @@ async fn connect_to_compute_success() {
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
@@ -606,7 +606,7 @@ async fn connect_to_compute_retry() {
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
@@ -625,7 +625,7 @@ async fn connect_to_compute_non_retry_1() {
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
@@ -644,7 +644,7 @@ async fn connect_to_compute_non_retry_2() {
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
@@ -674,7 +674,6 @@ async fn connect_to_compute_non_retry_3() {
|
||||
&ctx,
|
||||
&mechanism,
|
||||
&user_info,
|
||||
false,
|
||||
wake_compute_retry_config,
|
||||
connect_to_compute_retry_config,
|
||||
)
|
||||
@@ -696,7 +695,7 @@ async fn wake_retry() {
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
@@ -715,7 +714,7 @@ async fn wake_non_retry() {
|
||||
max_retries: 5,
|
||||
backoff_factor: 2.0,
|
||||
};
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
|
||||
@@ -83,7 +83,7 @@ impl From<LeakyBucketConfig> for utils::leaky_bucket::LeakyBucketConfig {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::float_cmp)]
|
||||
#[allow(clippy::float_cmp, clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
|
||||
@@ -63,6 +63,7 @@ impl LimitAlgorithm for Aimd {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ use rand::{Rng, SeedableRng};
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tracing::info;
|
||||
|
||||
use crate::ext::LockExt;
|
||||
use crate::intern::EndpointIdInt;
|
||||
|
||||
pub struct GlobalRateLimiter {
|
||||
@@ -246,12 +247,13 @@ impl<K: Hash + Eq, R: Rng, S: BuildHasher + Clone> BucketRateLimiter<K, R, S> {
|
||||
let n = self.map.shards().len();
|
||||
// this lock is ok as the periodic cycle of do_gc makes this very unlikely to collide
|
||||
// (impossible, infact, unless we have 2048 threads)
|
||||
let shard = self.rand.lock().unwrap().gen_range(0..n);
|
||||
let shard = self.rand.lock_propagate_poison().gen_range(0..n);
|
||||
self.map.shards()[shard].write().clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::hash::BuildHasherDefault;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -69,7 +69,11 @@ impl ConnectionWithCredentialsProvider {
|
||||
|
||||
pub fn new_with_static_credentials<T: IntoConnectionInfo>(params: T) -> Self {
|
||||
Self {
|
||||
credentials: Credentials::Static(params.into_connection_info().unwrap()),
|
||||
credentials: Credentials::Static(
|
||||
params
|
||||
.into_connection_info()
|
||||
.expect("static configured redis credentials should be a valid format"),
|
||||
),
|
||||
con: None,
|
||||
refresh_token_task: None,
|
||||
mutex: tokio::sync::Mutex::new(()),
|
||||
|
||||
@@ -6,6 +6,7 @@ use pq_proto::CancelKeyData;
|
||||
use redis::aio::PubSub;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
||||
@@ -13,7 +14,6 @@ use crate::cache::project_info::ProjectInfoCache;
|
||||
use crate::cancellation::{CancelMap, CancellationHandler};
|
||||
use crate::intern::{ProjectIdInt, RoleNameInt};
|
||||
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
|
||||
use tracing::Instrument;
|
||||
|
||||
const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
|
||||
pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates";
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use pq_proto::{BeAuthenticationSaslMessage, BeMessage};
|
||||
|
||||
use crate::parse::{split_at_const, split_cstr};
|
||||
use crate::parse::split_cstr;
|
||||
|
||||
/// SASL-specific payload of [`PasswordMessage`](pq_proto::FeMessage::PasswordMessage).
|
||||
#[derive(Debug)]
|
||||
@@ -19,7 +19,7 @@ impl<'a> FirstMessage<'a> {
|
||||
let (method_cstr, tail) = split_cstr(bytes)?;
|
||||
let method = method_cstr.to_str().ok()?;
|
||||
|
||||
let (len_bytes, bytes) = split_at_const(tail)?;
|
||||
let (len_bytes, bytes) = tail.split_first_chunk()?;
|
||||
let len = u32::from_be_bytes(*len_bytes) as usize;
|
||||
if len != bytes.len() {
|
||||
return None;
|
||||
@@ -51,6 +51,7 @@ impl<'a> ServerMessage<&'a str> {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -185,6 +185,7 @@ impl fmt::Debug for OwnedServerFirstMessage {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ fn sha256<'a>(parts: impl IntoIterator<Item = &'a [u8]>) -> [u8; 32] {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::threadpool::ThreadPool;
|
||||
use super::{Exchange, ServerSecret};
|
||||
|
||||
@@ -72,6 +72,7 @@ impl ServerSecret {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -33,14 +33,11 @@ thread_local! {
|
||||
}
|
||||
|
||||
impl ThreadPool {
|
||||
pub fn new(n_workers: u8) -> Arc<Self> {
|
||||
pub fn new(mut n_workers: u8) -> Arc<Self> {
|
||||
// rayon would be nice here, but yielding in rayon does not work well afaict.
|
||||
|
||||
if n_workers == 0 {
|
||||
return Arc::new(Self {
|
||||
runtime: None,
|
||||
metrics: Arc::new(ThreadPoolMetrics::new(n_workers as usize)),
|
||||
});
|
||||
n_workers = 1;
|
||||
}
|
||||
|
||||
Arc::new_cyclic(|pool| {
|
||||
@@ -66,7 +63,7 @@ impl ThreadPool {
|
||||
});
|
||||
})
|
||||
.build()
|
||||
.unwrap();
|
||||
.expect("password threadpool runtime should be configured correctly");
|
||||
|
||||
Self {
|
||||
runtime: Some(runtime),
|
||||
@@ -79,7 +76,7 @@ impl ThreadPool {
|
||||
JobHandle(
|
||||
self.runtime
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.expect("runtime is always set")
|
||||
.spawn(JobSpec { pbkdf2, endpoint }),
|
||||
)
|
||||
}
|
||||
@@ -87,7 +84,10 @@ impl ThreadPool {
|
||||
|
||||
impl Drop for ThreadPool {
|
||||
fn drop(&mut self) {
|
||||
self.runtime.take().unwrap().shutdown_background();
|
||||
self.runtime
|
||||
.take()
|
||||
.expect("runtime is always set")
|
||||
.shutdown_background();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -195,7 +195,6 @@ impl PoolingBackend {
|
||||
locks: &self.config.connect_compute_locks,
|
||||
},
|
||||
&backend,
|
||||
false, // do not allow self signed compute for http flow
|
||||
self.config.wake_compute_retry_config,
|
||||
self.config.connect_to_compute_retry_config,
|
||||
)
|
||||
@@ -237,7 +236,6 @@ impl PoolingBackend {
|
||||
locks: &self.config.connect_compute_locks,
|
||||
},
|
||||
&backend,
|
||||
false, // do not allow self signed compute for http flow
|
||||
self.config.wake_compute_retry_config,
|
||||
self.config.connect_to_compute_retry_config,
|
||||
)
|
||||
@@ -270,7 +268,11 @@ impl PoolingBackend {
|
||||
|
||||
if !self.local_pool.initialized(&conn_info) {
|
||||
// only install and grant usage one at a time.
|
||||
let _permit = local_backend.initialize.acquire().await.unwrap();
|
||||
let _permit = local_backend
|
||||
.initialize
|
||||
.acquire()
|
||||
.await
|
||||
.expect("semaphore should never be closed");
|
||||
|
||||
// check again for race
|
||||
if !self.local_pool.initialized(&conn_info) {
|
||||
|
||||
@@ -186,8 +186,8 @@ impl ClientDataRemote {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::mem;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use super::*;
|
||||
@@ -269,39 +269,33 @@ mod tests {
|
||||
assert_eq!(0, pool.get_global_connections_count());
|
||||
}
|
||||
{
|
||||
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
||||
client.do_drop().unwrap()();
|
||||
mem::forget(client); // drop the client
|
||||
let client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
||||
drop(client);
|
||||
assert_eq!(1, pool.get_global_connections_count());
|
||||
}
|
||||
{
|
||||
let mut closed_client = Client::new(
|
||||
let closed_client = Client::new(
|
||||
create_inner_with(MockClient::new(true)),
|
||||
conn_info.clone(),
|
||||
ep_pool.clone(),
|
||||
);
|
||||
closed_client.do_drop().unwrap()();
|
||||
mem::forget(closed_client); // drop the client
|
||||
// The closed client shouldn't be added to the pool.
|
||||
drop(closed_client);
|
||||
assert_eq!(1, pool.get_global_connections_count());
|
||||
}
|
||||
let is_closed: Arc<AtomicBool> = Arc::new(false.into());
|
||||
{
|
||||
let mut client = Client::new(
|
||||
let client = Client::new(
|
||||
create_inner_with(MockClient(is_closed.clone())),
|
||||
conn_info.clone(),
|
||||
ep_pool.clone(),
|
||||
);
|
||||
client.do_drop().unwrap()();
|
||||
mem::forget(client); // drop the client
|
||||
|
||||
drop(client);
|
||||
// The client should be added to the pool.
|
||||
assert_eq!(2, pool.get_global_connections_count());
|
||||
}
|
||||
{
|
||||
let mut client = Client::new(create_inner(), conn_info, ep_pool);
|
||||
client.do_drop().unwrap()();
|
||||
mem::forget(client); // drop the client
|
||||
let client = Client::new(create_inner(), conn_info, ep_pool);
|
||||
drop(client);
|
||||
|
||||
// The client shouldn't be added to the pool. Because the ep-pool is full.
|
||||
assert_eq!(2, pool.get_global_connections_count());
|
||||
@@ -319,15 +313,13 @@ mod tests {
|
||||
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),
|
||||
);
|
||||
{
|
||||
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
||||
client.do_drop().unwrap()();
|
||||
mem::forget(client); // drop the client
|
||||
let client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
||||
drop(client);
|
||||
assert_eq!(3, pool.get_global_connections_count());
|
||||
}
|
||||
{
|
||||
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
||||
client.do_drop().unwrap()();
|
||||
mem::forget(client); // drop the client
|
||||
let client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
||||
drop(client);
|
||||
|
||||
// The client shouldn't be added to the pool. Because the global pool is full.
|
||||
assert_eq!(3, pool.get_global_connections_count());
|
||||
|
||||
@@ -187,19 +187,22 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
|
||||
pub(crate) fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInnerCommon<C>) {
|
||||
let conn_id = client.get_conn_id();
|
||||
let pool_name = pool.read().get_name().to_string();
|
||||
let (max_conn, conn_count, pool_name) = {
|
||||
let pool = pool.read();
|
||||
(
|
||||
pool.global_pool_size_max_conns,
|
||||
pool.global_connections_count
|
||||
.load(atomic::Ordering::Relaxed),
|
||||
pool.get_name().to_string(),
|
||||
)
|
||||
};
|
||||
|
||||
if client.inner.is_closed() {
|
||||
info!(%conn_id, "{}: throwing away connection '{conn_info}' because connection is closed", pool_name);
|
||||
return;
|
||||
}
|
||||
|
||||
let global_max_conn = pool.read().global_pool_size_max_conns;
|
||||
if pool
|
||||
.read()
|
||||
.global_connections_count
|
||||
.load(atomic::Ordering::Relaxed)
|
||||
>= global_max_conn
|
||||
{
|
||||
if conn_count >= max_conn {
|
||||
info!(%conn_id, "{}: throwing away connection '{conn_info}' because pool is full", pool_name);
|
||||
return;
|
||||
}
|
||||
@@ -633,35 +636,29 @@ impl<C: ClientInnerExt> Client<C> {
|
||||
}
|
||||
|
||||
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
|
||||
let aux = &self.inner.as_ref().unwrap().aux;
|
||||
let aux = &self
|
||||
.inner
|
||||
.as_ref()
|
||||
.expect("client inner should not be removed")
|
||||
.aux;
|
||||
USAGE_METRICS.register(Ids {
|
||||
endpoint_id: aux.endpoint_id,
|
||||
branch_id: aux.branch_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn do_drop(&mut self) -> Option<impl FnOnce() + use<C>> {
|
||||
impl<C: ClientInnerExt> Drop for Client<C> {
|
||||
fn drop(&mut self) {
|
||||
let conn_info = self.conn_info.clone();
|
||||
let client = self
|
||||
.inner
|
||||
.take()
|
||||
.expect("client inner should not be removed");
|
||||
if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
|
||||
let current_span = self.span.clone();
|
||||
let _current_span = self.span.enter();
|
||||
// return connection to the pool
|
||||
return Some(move || {
|
||||
let _span = current_span.enter();
|
||||
EndpointConnPool::put(&conn_pool, &conn_info, client);
|
||||
});
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Drop for Client<C> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(drop) = self.do_drop() {
|
||||
tokio::task::spawn_blocking(drop);
|
||||
EndpointConnPool::put(&conn_pool, &conn_info, client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,11 +81,14 @@ impl HttpErrorBody {
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
// we do not have nested maps with non string keys so serialization shouldn't fail
|
||||
.body(
|
||||
Full::new(Bytes::from(serde_json::to_string(self).unwrap()))
|
||||
.map_err(|x| match x {})
|
||||
.boxed(),
|
||||
Full::new(Bytes::from(
|
||||
serde_json::to_string(self)
|
||||
.expect("serialising HttpErrorBody should never fail"),
|
||||
))
|
||||
.map_err(|x| match x {})
|
||||
.boxed(),
|
||||
)
|
||||
.unwrap()
|
||||
.expect("content-type header should be valid")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -204,7 +204,10 @@ fn pg_array_parse_inner(
|
||||
|
||||
if c == '\\' {
|
||||
escaped = true;
|
||||
(i, c) = pg_array_chr.next().unwrap();
|
||||
let Some(x) = pg_array_chr.next() else {
|
||||
return Err(JsonConversionError::UnbalancedArray);
|
||||
};
|
||||
(i, c) = x;
|
||||
}
|
||||
|
||||
match c {
|
||||
@@ -253,6 +256,7 @@ fn pg_array_parse_inner(
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use serde_json::json;
|
||||
|
||||
|
||||
@@ -179,7 +179,6 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
});
|
||||
let pool = Arc::downgrade(&global_pool);
|
||||
let pool_clone = pool.clone();
|
||||
|
||||
let db_user = conn_info.db_and_user();
|
||||
let idle = global_pool.get_idle_timeout();
|
||||
@@ -273,11 +272,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
}),
|
||||
};
|
||||
|
||||
Client::new(
|
||||
inner,
|
||||
conn_info,
|
||||
Arc::downgrade(&pool_clone.upgrade().unwrap().global_pool),
|
||||
)
|
||||
Client::new(inner, conn_info, Arc::downgrade(&global_pool.global_pool))
|
||||
}
|
||||
|
||||
impl ClientInnerCommon<postgres_client::Client> {
|
||||
@@ -321,7 +316,8 @@ fn resign_jwt(sk: &SigningKey, payload: &[u8], jti: u64) -> Result<String, HttpC
|
||||
let mut buffer = itoa::Buffer::new();
|
||||
|
||||
// encode the jti integer to a json rawvalue
|
||||
let jti = serde_json::from_str::<&RawValue>(buffer.format(jti)).unwrap();
|
||||
let jti = serde_json::from_str::<&RawValue>(buffer.format(jti))
|
||||
.expect("itoa formatted integer should be guaranteed valid json");
|
||||
|
||||
// update the jti in-place
|
||||
let payload =
|
||||
@@ -368,6 +364,7 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use p256::ecdsa::SigningKey;
|
||||
use typed_json::json;
|
||||
|
||||
@@ -46,6 +46,7 @@ use utils::http::error::ApiError;
|
||||
use crate::cancellation::CancellationHandlerMain;
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::context::RequestContext;
|
||||
use crate::ext::TaskExt;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectHeader, ConnectionInfo};
|
||||
use crate::proxy::run_until_cancelled;
|
||||
@@ -84,7 +85,7 @@ pub async fn task_main(
|
||||
cancellation_token.cancelled().await;
|
||||
tokio::task::spawn_blocking(move || conn_pool.shutdown())
|
||||
.await
|
||||
.unwrap();
|
||||
.propagate_task_panic();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -104,7 +105,7 @@ pub async fn task_main(
|
||||
cancellation_token.cancelled().await;
|
||||
tokio::task::spawn_blocking(move || http_conn_pool.shutdown())
|
||||
.await
|
||||
.unwrap();
|
||||
.propagate_task_panic();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -1110,6 +1110,7 @@ impl Discard<'_> {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -178,6 +178,7 @@ pub(crate) async fn serve_websocket(
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::pin::pin;
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ impl std::fmt::Display for ApiUrl {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
@@ -407,6 +407,7 @@ async fn upload_backup_events(
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::fs;
|
||||
use std::io::BufReader;
|
||||
|
||||
@@ -55,6 +55,7 @@ postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
sha2.workspace = true
|
||||
sd-notify.workspace = true
|
||||
storage_broker.workspace = true
|
||||
|
||||
13
safekeeper/client/Cargo.toml
Normal file
13
safekeeper/client/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "safekeeper_client"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
safekeeper_api.workspace = true
|
||||
thiserror.workspace = true
|
||||
reqwest = { workspace = true, features = [ "stream" ] }
|
||||
serde.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
1
safekeeper/client/src/lib.rs
Normal file
1
safekeeper/client/src/lib.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod mgmt_api;
|
||||
@@ -2,10 +2,6 @@
|
||||
//!
|
||||
//! Partially copied from pageserver client; some parts might be better to be
|
||||
//! united.
|
||||
//!
|
||||
//! It would be also good to move it out to separate crate, but this needs
|
||||
//! duplication of internal-but-reported structs like WalSenderState, ServerInfo
|
||||
//! etc.
|
||||
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use safekeeper_api::models::TimelineStatus;
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user