mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 06:00:38 +00:00
Compare commits
24 Commits
jcsp/rfc-h
...
skyzh/gc-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8504917b80 | ||
|
|
78b322f616 | ||
|
|
2de3629b88 | ||
|
|
1fe23fe8d2 | ||
|
|
604eb5e8d4 | ||
|
|
d599d2df80 | ||
|
|
8263107f6c | ||
|
|
d94fc75cfc | ||
|
|
9cdc8c0e6c | ||
|
|
2d45522fa6 | ||
|
|
94e6897ead | ||
|
|
332aae1484 | ||
|
|
8c12ccf729 | ||
|
|
abae7637d6 | ||
|
|
38a883118a | ||
|
|
40aa4d7151 | ||
|
|
8e51bfc597 | ||
|
|
906d7468cc | ||
|
|
438f7bb726 | ||
|
|
f62ddb11ed | ||
|
|
7b7e4a9fd3 | ||
|
|
4bbdb758ec | ||
|
|
20af9cef17 | ||
|
|
a2902e774a |
2
.github/workflows/benchmarking.yml
vendored
2
.github/workflows/benchmarking.yml
vendored
@@ -141,6 +141,8 @@ jobs:
|
||||
--ignore test_runner/performance/test_physical_replication.py
|
||||
--ignore test_runner/performance/test_perf_ingest_using_pgcopydb.py
|
||||
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
|
||||
--ignore test_runner/performance/test_perf_many_relations.py
|
||||
--ignore test_runner/performance/test_perf_oltp_large_tenant.py
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -1061,7 +1061,7 @@ jobs:
|
||||
exit 1
|
||||
|
||||
deploy:
|
||||
needs: [ check-permissions, push-neon-image-prod, push-compute-image-prod, meta, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
|
||||
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
|
||||
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-neon-image-prod` and `push-compute-image-prod`
|
||||
if: ${{ contains(fromJSON('["push-main", "storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind) && !failure() && !cancelled() }}
|
||||
permissions:
|
||||
|
||||
4
.github/workflows/periodic_pagebench.yml
vendored
4
.github/workflows/periodic_pagebench.yml
vendored
@@ -78,8 +78,10 @@ jobs:
|
||||
run: |
|
||||
if [ -z "$INPUT_COMMIT_HASH" ]; then
|
||||
echo "COMMIT_HASH=$(curl -s https://api.github.com/repos/neondatabase/neon/commits/main | jq -r '.sha')" >> $GITHUB_ENV
|
||||
echo "COMMIT_HASH_TYPE=latest" >> $GITHUB_ENV
|
||||
else
|
||||
echo "COMMIT_HASH=$INPUT_COMMIT_HASH" >> $GITHUB_ENV
|
||||
echo "COMMIT_HASH_TYPE=manual" >> $GITHUB_ENV
|
||||
fi
|
||||
|
||||
- name: Start Bench with run_id
|
||||
@@ -89,7 +91,7 @@ jobs:
|
||||
-H 'accept: application/json' \
|
||||
-H 'Content-Type: application/json' \
|
||||
-H "Authorization: Bearer $API_KEY" \
|
||||
-d "{\"neonRepoCommitHash\": \"${COMMIT_HASH}\"}"
|
||||
-d "{\"neonRepoCommitHash\": \"${COMMIT_HASH}\", \"neonRepoCommitHashType\": \"${COMMIT_HASH_TYPE}\"}"
|
||||
|
||||
- name: Poll Test Status
|
||||
id: poll_step
|
||||
|
||||
94
Cargo.lock
generated
94
Cargo.lock
generated
@@ -783,6 +783,28 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-extra"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"axum-core",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"headers",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"serde",
|
||||
"tower 0.5.2",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "azure_core"
|
||||
version = "0.21.0"
|
||||
@@ -925,9 +947,9 @@ checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.21.1"
|
||||
version = "0.21.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f1e31e207a6b8fb791a38ea3105e6cb541f55e4d029902d3039a4ad07cc4105"
|
||||
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
@@ -1305,6 +1327,7 @@ dependencies = [
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-types",
|
||||
"axum",
|
||||
"axum-extra",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"camino",
|
||||
@@ -1316,6 +1339,7 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
@@ -2297,7 +2321,7 @@ name = "framed-websockets"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/framed-websockets#34eff3d6f8cfccbc5f35e4f65314ff7328621127"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"bytemuck",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
@@ -2653,7 +2677,7 @@ version = "7.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"byteorder",
|
||||
"crossbeam-channel",
|
||||
"flate2",
|
||||
@@ -2661,6 +2685,30 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "headers"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9"
|
||||
dependencies = [
|
||||
"base64 0.21.7",
|
||||
"bytes",
|
||||
"headers-core",
|
||||
"http 1.1.0",
|
||||
"httpdate",
|
||||
"mime",
|
||||
"sha1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "headers-core"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
|
||||
dependencies = [
|
||||
"http 1.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.5.0"
|
||||
@@ -2798,12 +2846,9 @@ name = "http-utils"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"backtrace",
|
||||
"bytes",
|
||||
"fail",
|
||||
"flate2",
|
||||
"hyper 0.14.30",
|
||||
"inferno 0.12.0",
|
||||
"itertools 0.10.5",
|
||||
"jemalloc_pprof",
|
||||
"metrics",
|
||||
@@ -3302,9 +3347,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
|
||||
|
||||
[[package]]
|
||||
name = "jemalloc_pprof"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a883828bd6a4b957cd9f618886ff19e5f3ebd34e06ba0e855849e049fef32fb"
|
||||
checksum = "5622af6d21ff86ed7797ef98e11b8f302da25ec69a7db9f6cde8e2e1c8df9992"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
@@ -3388,7 +3433,7 @@ version = "9.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"js-sys",
|
||||
"pem",
|
||||
"ring",
|
||||
@@ -3503,9 +3548,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mappings"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce9229c438fbf1c333926e2053c4c091feabbd40a1b590ec62710fea2384af9e"
|
||||
checksum = "e434981a332777c2b3062652d16a55f8e74fa78e6b1882633f0d77399c84fc2a"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
@@ -4340,9 +4385,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.1.8"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c"
|
||||
checksum = "aab21828b6b5952fdadd6c377728ffae53ec3a21b2febc47319ab65741f7e2fd"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"seize",
|
||||
@@ -4470,7 +4515,7 @@ version = "3.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -4794,12 +4839,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pprof_util"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "65c568b3f8c1c37886ae07459b1946249e725c315306b03be5632f84c239f781"
|
||||
checksum = "9fa015c78eed2130951e22c58d2095849391e73817ab2e74f71b0b9f63dd8416"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"backtrace",
|
||||
"flate2",
|
||||
"inferno 0.12.0",
|
||||
"num",
|
||||
"paste",
|
||||
"prost",
|
||||
@@ -5815,7 +5862,7 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5824,7 +5871,7 @@ version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f48172685e6ff52a556baa527774f61fcaa884f59daf3375c62a3f1cd2549dab"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
@@ -6063,9 +6110,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "seize"
|
||||
version = "0.4.9"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93"
|
||||
checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
@@ -7358,10 +7405,12 @@ version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bitflags 2.8.0",
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
@@ -7715,7 +7764,6 @@ dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"backtrace",
|
||||
"bincode",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
@@ -8269,7 +8317,7 @@ dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.1",
|
||||
"base64 0.21.7",
|
||||
"base64ct",
|
||||
"bytes",
|
||||
"camino",
|
||||
|
||||
@@ -53,7 +53,6 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
backtrace = "0.3.74"
|
||||
flate2 = "1.0.26"
|
||||
assert-json-diff = "2"
|
||||
async-stream = "0.3"
|
||||
@@ -68,6 +67,7 @@ aws-credential-types = "1.2.0"
|
||||
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
||||
aws-types = "1.3"
|
||||
axum = { version = "0.8.1", features = ["ws"] }
|
||||
axum-extra = { version = "0.10.0", features = ["typed-header"] }
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
bindgen = "0.71"
|
||||
@@ -114,11 +114,10 @@ hyper-util = "0.1"
|
||||
tokio-tungstenite = "0.21.0"
|
||||
indexmap = "2"
|
||||
indoc = "2"
|
||||
inferno = "0.12.0"
|
||||
ipnet = "2.10.0"
|
||||
itertools = "0.10"
|
||||
itoa = "1.0.11"
|
||||
jemalloc_pprof = "0.6"
|
||||
jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
|
||||
jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
libc = "0.2"
|
||||
@@ -193,7 +192,7 @@ toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = {version = "0.12.3", default-features = false, features = ["channel", "tls", "tls-roots"]}
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
|
||||
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
|
||||
|
||||
# This revision uses opentelemetry 0.27. There's no tag for it.
|
||||
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
|
||||
|
||||
@@ -1484,7 +1484,7 @@ WORKDIR /ext-src
|
||||
COPY compute/patches/pg_duckdb_v031.patch .
|
||||
COPY compute/patches/duckdb_v120.patch .
|
||||
# pg_duckdb build requires source dir to be a git repo to get submodules
|
||||
# allow neon_superuser to execute some functions that in pg_duckdb are available to superuser only:
|
||||
# allow neon_superuser to execute some functions that in pg_duckdb are available to superuser only:
|
||||
# - extension management function duckdb.install_extension()
|
||||
# - access to duckdb.extensions table and its sequence
|
||||
RUN git clone --depth 1 --branch v0.3.1 https://github.com/duckdb/pg_duckdb.git pg_duckdb-src && \
|
||||
@@ -1499,8 +1499,8 @@ ARG PG_VERSION
|
||||
COPY --from=pg_duckdb-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/pg_duckdb-src
|
||||
RUN make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_duckdb.control
|
||||
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_duckdb.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_repack"
|
||||
@@ -1758,15 +1758,15 @@ ARG TARGETARCH
|
||||
# test_runner/regress/test_compute_metrics.py
|
||||
# See comment on the top of the file regading `echo`, `-e` and `\n`
|
||||
RUN if [ "$TARGETARCH" = "amd64" ]; then\
|
||||
postgres_exporter_sha256='027e75dda7af621237ff8f5ac66b78a40b0093595f06768612b92b1374bd3105';\
|
||||
postgres_exporter_sha256='59aa4a7bb0f7d361f5e05732f5ed8c03cc08f78449cef5856eadec33a627694b';\
|
||||
pgbouncer_exporter_sha256='c9f7cf8dcff44f0472057e9bf52613d93f3ffbc381ad7547a959daa63c5e84ac';\
|
||||
sql_exporter_sha256='38e439732bbf6e28ca4a94d7bc3686d3fa1abdb0050773d5617a9efdb9e64d08';\
|
||||
else\
|
||||
postgres_exporter_sha256='131a376d25778ff9701a4c81f703f179e0b58db5c2c496e66fa43f8179484786';\
|
||||
postgres_exporter_sha256='d1dedea97f56c6d965837bfd1fbb3e35a3b4a4556f8cccee8bd513d8ee086124';\
|
||||
pgbouncer_exporter_sha256='217c4afd7e6492ae904055bc14fe603552cf9bac458c063407e991d68c519da3';\
|
||||
sql_exporter_sha256='11918b00be6e2c3a67564adfdb2414fdcbb15a5db76ea17d1d1a944237a893c6';\
|
||||
fi\
|
||||
&& curl -sL https://github.com/prometheus-community/postgres_exporter/releases/download/v0.16.0/postgres_exporter-0.16.0.linux-${TARGETARCH}.tar.gz\
|
||||
&& curl -sL https://github.com/prometheus-community/postgres_exporter/releases/download/v0.17.1/postgres_exporter-0.17.1.linux-${TARGETARCH}.tar.gz\
|
||||
| tar xzf - --strip-components=1 -C.\
|
||||
&& curl -sL https://github.com/prometheus-community/pgbouncer_exporter/releases/download/v0.10.2/pgbouncer_exporter-0.10.2.linux-${TARGETARCH}.tar.gz\
|
||||
| tar xzf - --strip-components=1 -C.\
|
||||
@@ -1933,6 +1933,7 @@ RUN apt update && \
|
||||
locales \
|
||||
procps \
|
||||
ca-certificates \
|
||||
rsyslog \
|
||||
$VERSION_INSTALLS && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
@@ -1978,6 +1979,15 @@ COPY --from=sql_exporter_preprocessor --chmod=0644 /home/nonroot/compute/etc/neo
|
||||
# Make the libraries we built available
|
||||
RUN echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
|
||||
|
||||
# rsyslog config permissions
|
||||
RUN chown postgres:postgres /etc/rsyslog.conf && \
|
||||
touch /etc/compute_rsyslog.conf && \
|
||||
chown -R postgres:postgres /etc/compute_rsyslog.conf && \
|
||||
# directory for rsyslogd pid file
|
||||
mkdir /var/run/rsyslogd && \
|
||||
chown -R postgres:postgres /var/run/rsyslogd
|
||||
|
||||
|
||||
ENV LANG=en_US.utf8
|
||||
USER postgres
|
||||
ENTRYPOINT ["/usr/local/bin/compute_ctl"]
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
import 'sql_exporter/lfc_approximate_working_set_size.libsonnet',
|
||||
import 'sql_exporter/lfc_approximate_working_set_size_windows.libsonnet',
|
||||
import 'sql_exporter/lfc_cache_size_limit.libsonnet',
|
||||
import 'sql_exporter/lfc_chunk_size.libsonnet',
|
||||
import 'sql_exporter/lfc_hits.libsonnet',
|
||||
import 'sql_exporter/lfc_misses.libsonnet',
|
||||
import 'sql_exporter/lfc_used.libsonnet',
|
||||
|
||||
10
compute/etc/sql_exporter/lfc_chunk_size.libsonnet
Normal file
10
compute/etc/sql_exporter/lfc_chunk_size.libsonnet
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
metric_name: 'lfc_chunk_size',
|
||||
type: 'gauge',
|
||||
help: 'LFC chunk size, measured in 8KiB pages',
|
||||
key_labels: null,
|
||||
values: [
|
||||
'lfc_chunk_size_pages',
|
||||
],
|
||||
query: importstr 'sql_exporter/lfc_chunk_size.sql',
|
||||
}
|
||||
1
compute/etc/sql_exporter/lfc_chunk_size.sql
Normal file
1
compute/etc/sql_exporter/lfc_chunk_size.sql
Normal file
@@ -0,0 +1 @@
|
||||
SELECT lfc_value AS lfc_chunk_size_pages FROM neon.neon_lfc_stats WHERE lfc_key = 'file_cache_chunk_size_pages';
|
||||
@@ -54,7 +54,7 @@ files:
|
||||
# regardless of hostname (ALL)
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
|
||||
@@ -54,7 +54,7 @@ files:
|
||||
# regardless of hostname (ALL)
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
|
||||
@@ -17,6 +17,7 @@ aws-sdk-kms.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
anyhow.workspace = true
|
||||
axum = { workspace = true, features = [] }
|
||||
axum-extra.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
cfg-if.workspace = true
|
||||
@@ -25,6 +26,7 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
|
||||
@@ -179,6 +179,7 @@ fn main() -> Result<()> {
|
||||
live_config_allowed: cli_spec.live_config_allowed,
|
||||
},
|
||||
cli_spec.spec,
|
||||
cli_spec.compute_ctl_config,
|
||||
)?;
|
||||
|
||||
let exit_code = compute_node.run()?;
|
||||
|
||||
@@ -11,8 +11,10 @@ use std::{env, fs};
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent};
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
@@ -35,6 +37,7 @@ use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::rsyslog::configure_and_start_rsyslog;
|
||||
use crate::spec::*;
|
||||
use crate::swap::resize_swap;
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
@@ -132,6 +135,8 @@ pub struct ComputeState {
|
||||
/// passed by the control plane with a /configure HTTP request.
|
||||
pub pspec: Option<ParsedSpec>,
|
||||
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
|
||||
/// If the spec is passed by a /configure request, 'startup_span' is the
|
||||
/// /configure request's tracing span. The main thread enters it when it
|
||||
/// processes the compute startup, so that the compute startup is considered
|
||||
@@ -155,6 +160,7 @@ impl ComputeState {
|
||||
last_active: None,
|
||||
error: None,
|
||||
pspec: None,
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
startup_span: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
}
|
||||
@@ -365,7 +371,11 @@ pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String {
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn new(params: ComputeNodeParams, cli_spec: Option<ComputeSpec>) -> Result<Self> {
|
||||
pub fn new(
|
||||
params: ComputeNodeParams,
|
||||
cli_spec: Option<ComputeSpec>,
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
) -> Result<Self> {
|
||||
let connstr = params.connstr.as_str();
|
||||
let conn_conf = postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build postgres config from connstr")?;
|
||||
@@ -377,6 +387,7 @@ impl ComputeNode {
|
||||
let pspec = ParsedSpec::try_from(cli_spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
}
|
||||
new_state.compute_ctl_config = compute_ctl_config;
|
||||
|
||||
Ok(ComputeNode {
|
||||
params,
|
||||
@@ -405,11 +416,19 @@ impl ComputeNode {
|
||||
|
||||
// Launch the external HTTP server first, so that we can serve control plane
|
||||
// requests while configuration is still in progress.
|
||||
crate::http::server::Server::External(this.params.external_http_port).launch(&this);
|
||||
crate::http::server::Server::External {
|
||||
port: this.params.external_http_port,
|
||||
jwks: this.state.lock().unwrap().compute_ctl_config.jwks.clone(),
|
||||
compute_id: this.params.compute_id.clone(),
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
crate::http::server::Server::Internal(this.params.internal_http_port).launch(&this);
|
||||
crate::http::server::Server::Internal {
|
||||
port: this.params.internal_http_port,
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// If we got a spec from the CLI already, use that. Otherwise wait for the
|
||||
// control plane to pass it to us with a /configure HTTP request
|
||||
@@ -468,6 +487,8 @@ impl ComputeNode {
|
||||
// Kills the actual task running the monitor
|
||||
handle.abort();
|
||||
}
|
||||
} else {
|
||||
_ = vm_monitor; // appease unused lint on macOS
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -658,6 +679,23 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// Configure and start rsyslog if necessary
|
||||
if let ComputeAudit::Hipaa = pspec.spec.audit_log_level {
|
||||
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
|
||||
if remote_endpoint.is_empty() {
|
||||
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
|
||||
}
|
||||
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
// TODO: make this more robust
|
||||
// now rsyslog starts once and there is no monitoring or restart if it fails
|
||||
configure_and_start_rsyslog(
|
||||
log_directory_path.to_str().unwrap(),
|
||||
"hipaa",
|
||||
&remote_endpoint,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Launch remaining service threads
|
||||
let _monitor_handle = launch_monitor(self);
|
||||
let _configurator_handle = launch_configurator(self);
|
||||
@@ -791,6 +829,7 @@ impl ComputeNode {
|
||||
};
|
||||
StartVmMonitorResult { token, vm_monitor }
|
||||
} else {
|
||||
_ = disable_lfc_resizing; // appease unused lint on macOS
|
||||
StartVmMonitorResult { }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use anyhow::Result;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io;
|
||||
@@ -5,10 +6,11 @@ use std::io::Write;
|
||||
use std::io::prelude::*;
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Result;
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec, GenericOption};
|
||||
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
|
||||
|
||||
use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize, escape_conf_value};
|
||||
use crate::pg_helpers::{
|
||||
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
|
||||
};
|
||||
|
||||
/// Check that `line` is inside a text file and put it there if it is not.
|
||||
/// Create file if it doesn't exist.
|
||||
@@ -138,6 +140,54 @@ pub fn write_postgres_conf(
|
||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||
}
|
||||
|
||||
// If audit logging is enabled, configure pgaudit.
|
||||
//
|
||||
// Note, that this is called after the settings from spec are written.
|
||||
// This way we always override the settings from the spec
|
||||
// and don't allow the user or the control plane admin to change them.
|
||||
if let ComputeAudit::Hipaa = spec.audit_log_level {
|
||||
writeln!(file, "# Managed by compute_ctl audit settings: begin")?;
|
||||
// This log level is very verbose
|
||||
// but this is necessary for HIPAA compliance.
|
||||
writeln!(file, "pgaudit.log='all'")?;
|
||||
writeln!(file, "pgaudit.log_parameter=on")?;
|
||||
// Disable logging of catalog queries
|
||||
// The catalog doesn't contain sensitive data, so we don't need to audit it.
|
||||
writeln!(file, "pgaudit.log_catalog=off")?;
|
||||
// Set log rotation to 5 minutes
|
||||
// TODO: tune this after performance testing
|
||||
writeln!(file, "pgaudit.log_rotation_age=5")?;
|
||||
|
||||
// Add audit shared_preload_libraries, if they are not present.
|
||||
//
|
||||
// The caller who sets the flag is responsible for ensuring that the necessary
|
||||
// shared_preload_libraries are present in the compute image,
|
||||
// otherwise the compute start will fail.
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
let mut extra_shared_preload_libraries = String::new();
|
||||
if !libs.contains("pgaudit") {
|
||||
extra_shared_preload_libraries.push_str(",pgaudit");
|
||||
}
|
||||
if !libs.contains("pgauditlogtofile") {
|
||||
extra_shared_preload_libraries.push_str(",pgauditlogtofile");
|
||||
}
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='{}{}'",
|
||||
libs, extra_shared_preload_libraries
|
||||
)?;
|
||||
} else {
|
||||
// Typically, this should be unreacheable,
|
||||
// because we always set at least some shared_preload_libraries in the spec
|
||||
// but let's handle it explicitly anyway.
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='neon,pgaudit,pgauditlogtofile'"
|
||||
)?;
|
||||
}
|
||||
writeln!(file, "# Managed by compute_ctl audit settings: end")?;
|
||||
}
|
||||
|
||||
writeln!(file, "neon.extension_server_port={}", extension_server_port)?;
|
||||
|
||||
if spec.drop_subscriptions_before_start {
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
# Load imfile module to read log files
|
||||
module(load="imfile")
|
||||
|
||||
# Input configuration for log files in the specified directory
|
||||
# Replace {log_directory} with the directory containing the log files
|
||||
input(type="imfile" File="{log_directory}/*.log" Tag="{tag}" Severity="info" Facility="local0")
|
||||
global(workDirectory="/var/log")
|
||||
|
||||
# Forward logs to remote syslog server
|
||||
*.* @@{remote_endpoint}
|
||||
@@ -253,27 +253,31 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
// Do request to extension storage proxy, i.e.
|
||||
// Do request to extension storage proxy, e.g.,
|
||||
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
|
||||
// using HHTP GET
|
||||
// and return the response body as bytes
|
||||
//
|
||||
// using HTTP GET and return the response body as bytes.
|
||||
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
|
||||
let uri = format!("{}/{}", ext_remote_storage, ext_path);
|
||||
let filename = Path::new(ext_path)
|
||||
.file_name()
|
||||
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
|
||||
.to_str()
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
info!("Download extension {} from uri {}", ext_path, uri);
|
||||
info!("Downloading extension file '{}' from uri {}", filename, uri);
|
||||
|
||||
match do_extension_server_request(&uri).await {
|
||||
Ok(resp) => {
|
||||
info!("Successfully downloaded remote extension data {}", ext_path);
|
||||
REMOTE_EXT_REQUESTS_TOTAL
|
||||
.with_label_values(&[&StatusCode::OK.to_string()])
|
||||
.with_label_values(&[&StatusCode::OK.to_string(), &filename])
|
||||
.inc();
|
||||
Ok(resp)
|
||||
}
|
||||
Err((msg, status)) => {
|
||||
REMOTE_EXT_REQUESTS_TOTAL
|
||||
.with_label_values(&[&status])
|
||||
.with_label_values(&[&status, &filename])
|
||||
.inc();
|
||||
bail!(msg);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
pub(crate) mod json;
|
||||
pub(crate) mod path;
|
||||
pub(crate) mod query;
|
||||
pub(crate) mod request_id;
|
||||
|
||||
pub(crate) use json::Json;
|
||||
pub(crate) use path::Path;
|
||||
pub(crate) use query::Query;
|
||||
pub(crate) use request_id::RequestId;
|
||||
|
||||
86
compute_tools/src/http/extract/request_id.rs
Normal file
86
compute_tools/src/http/extract/request_id.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
use std::{
|
||||
fmt::Display,
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
|
||||
use axum::{extract::FromRequestParts, response::IntoResponse};
|
||||
use http::{StatusCode, request::Parts};
|
||||
|
||||
use crate::http::{JsonResponse, headers::X_REQUEST_ID};
|
||||
|
||||
/// Extract the request ID from the `X-Request-Id` header.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct RequestId(pub String);
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Rejection used for [`RequestId`].
|
||||
///
|
||||
/// Contains one variant for each way the [`RequestId`] extractor can
|
||||
/// fail.
|
||||
pub(crate) enum RequestIdRejection {
|
||||
/// The request is missing the header.
|
||||
MissingRequestId,
|
||||
|
||||
/// The value of the header is invalid UTF-8.
|
||||
InvalidUtf8,
|
||||
}
|
||||
|
||||
impl RequestIdRejection {
|
||||
pub fn status(&self) -> StatusCode {
|
||||
match self {
|
||||
RequestIdRejection::MissingRequestId => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
RequestIdRejection::InvalidUtf8 => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn message(&self) -> String {
|
||||
match self {
|
||||
RequestIdRejection::MissingRequestId => "request ID is missing",
|
||||
RequestIdRejection::InvalidUtf8 => "request ID is invalid UTF-8",
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoResponse for RequestIdRejection {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
JsonResponse::error(self.status(), self.message())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> FromRequestParts<S> for RequestId
|
||||
where
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = RequestIdRejection;
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
|
||||
match parts.headers.get(X_REQUEST_ID) {
|
||||
Some(value) => match value.to_str() {
|
||||
Ok(request_id) => Ok(Self(request_id.to_string())),
|
||||
Err(_) => Err(RequestIdRejection::InvalidUtf8),
|
||||
},
|
||||
None => Err(RequestIdRejection::MissingRequestId),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for RequestId {
|
||||
type Target = String;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for RequestId {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for RequestId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
2
compute_tools/src/http/headers.rs
Normal file
2
compute_tools/src/http/headers.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
/// Constant for `X-Request-Id` header.
|
||||
pub const X_REQUEST_ID: &str = "x-request-id";
|
||||
145
compute_tools/src/http/middleware/authorize.rs
Normal file
145
compute_tools/src/http/middleware/authorize.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
use std::{collections::HashSet, net::SocketAddr};
|
||||
|
||||
use anyhow::{Result, anyhow};
|
||||
use axum::{RequestExt, body::Body, extract::ConnectInfo};
|
||||
use axum_extra::{
|
||||
TypedHeader,
|
||||
headers::{Authorization, authorization::Bearer},
|
||||
};
|
||||
use futures::future::BoxFuture;
|
||||
use http::{Request, Response, StatusCode};
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
|
||||
use serde::Deserialize;
|
||||
use tower_http::auth::AsyncAuthorizeRequest;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::http::{JsonResponse, extract::RequestId};
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub(in crate::http) struct Claims {
|
||||
compute_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(in crate::http) struct Authorize {
|
||||
compute_id: String,
|
||||
jwks: JwkSet,
|
||||
validation: Validation,
|
||||
}
|
||||
|
||||
impl Authorize {
|
||||
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
|
||||
let mut validation = Validation::new(Algorithm::EdDSA);
|
||||
// Nothing is currently required
|
||||
validation.required_spec_claims = HashSet::new();
|
||||
validation.validate_exp = true;
|
||||
// Unused by the control plane
|
||||
validation.validate_aud = false;
|
||||
// Unused by the control plane
|
||||
validation.validate_nbf = false;
|
||||
|
||||
Self {
|
||||
compute_id,
|
||||
jwks,
|
||||
validation,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
type RequestBody = Body;
|
||||
type ResponseBody = Body;
|
||||
type Future = BoxFuture<'static, Result<Request<Body>, Response<Self::ResponseBody>>>;
|
||||
|
||||
fn authorize(&mut self, mut request: Request<Body>) -> Self::Future {
|
||||
let compute_id = self.compute_id.clone();
|
||||
let jwks = self.jwks.clone();
|
||||
let validation = self.validation.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
let request_id = request.extract_parts::<RequestId>().await.unwrap();
|
||||
|
||||
// TODO: Remove this check after a successful rollout
|
||||
if jwks.keys.is_empty() {
|
||||
warn!(%request_id, "Authorization has not been configured");
|
||||
|
||||
return Ok(request);
|
||||
}
|
||||
|
||||
let connect_info = request
|
||||
.extract_parts::<ConnectInfo<SocketAddr>>()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// In the event the request is coming from the loopback interface,
|
||||
// allow all requests
|
||||
if connect_info.ip().is_loopback() {
|
||||
warn!(%request_id, "Bypassed authorization because request is coming from the loopback interface");
|
||||
|
||||
return Ok(request);
|
||||
}
|
||||
|
||||
let TypedHeader(Authorization(bearer)) = request
|
||||
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
|
||||
.await
|
||||
.map_err(|_| {
|
||||
JsonResponse::error(StatusCode::BAD_REQUEST, "invalid authorization token")
|
||||
})?;
|
||||
|
||||
let data = match Self::verify(&jwks, bearer.token(), &validation) {
|
||||
Ok(claims) => claims,
|
||||
Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)),
|
||||
};
|
||||
|
||||
if data.claims.compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid claims in authorization token",
|
||||
));
|
||||
}
|
||||
|
||||
// Make claims available to any subsequent middleware or request
|
||||
// handlers
|
||||
request.extensions_mut().insert(data.claims);
|
||||
|
||||
Ok(request)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Authorize {
|
||||
/// Verify the token using the JSON Web Key set and return the token data.
|
||||
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
|
||||
debug_assert!(!jwks.keys.is_empty());
|
||||
|
||||
for jwk in jwks.keys.iter() {
|
||||
let decoding_key = match DecodingKey::from_jwk(jwk) {
|
||||
Ok(key) => key,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to construct decoding key from {}: {}",
|
||||
jwk.common.key_id.as_ref().unwrap(),
|
||||
e
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match jsonwebtoken::decode::<Claims>(token, &decoding_key, validation) {
|
||||
Ok(data) => return Ok(data),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to decode authorization token using {}: {}",
|
||||
jwk.common.key_id.as_ref().unwrap(),
|
||||
e
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(anyhow!("Failed to verify authorization token"))
|
||||
}
|
||||
}
|
||||
1
compute_tools/src/http/middleware/mod.rs
Normal file
1
compute_tools/src/http/middleware/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub(in crate::http) mod authorize;
|
||||
@@ -7,6 +7,8 @@ use serde::Serialize;
|
||||
use tracing::error;
|
||||
|
||||
mod extract;
|
||||
mod headers;
|
||||
mod middleware;
|
||||
mod routes;
|
||||
pub mod server;
|
||||
|
||||
|
||||
@@ -10,48 +10,58 @@ use axum::middleware::{self, Next};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::routing::{get, post};
|
||||
use http::StatusCode;
|
||||
use jsonwebtoken::jwk::JwkSet;
|
||||
use tokio::net::TcpListener;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::request_id::PropagateRequestIdLayer;
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing::{Span, debug, error, info};
|
||||
use tower_http::{
|
||||
auth::AsyncRequireAuthorizationLayer, request_id::PropagateRequestIdLayer, trace::TraceLayer,
|
||||
};
|
||||
use tracing::{Span, error, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::routes::{
|
||||
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||
grants, insights, metrics, metrics_json, status, terminate,
|
||||
use super::{
|
||||
headers::X_REQUEST_ID,
|
||||
middleware::authorize::Authorize,
|
||||
routes::{
|
||||
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||
grants, insights, metrics, metrics_json, status, terminate,
|
||||
},
|
||||
};
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
const X_REQUEST_ID: &str = "x-request-id";
|
||||
|
||||
/// `compute_ctl` has two servers: internal and external. The internal server
|
||||
/// binds to the loopback interface and handles communication from clients on
|
||||
/// the compute. The external server is what receives communication from the
|
||||
/// control plane, the metrics scraper, etc. We make the distinction because
|
||||
/// certain routes in `compute_ctl` only need to be exposed to local processes
|
||||
/// like Postgres via the neon extension and local_proxy.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Server {
|
||||
Internal(u16),
|
||||
External(u16),
|
||||
Internal {
|
||||
port: u16,
|
||||
},
|
||||
External {
|
||||
port: u16,
|
||||
jwks: JwkSet,
|
||||
compute_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl Display for Server {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Server::Internal(_) => f.write_str("internal"),
|
||||
Server::External(_) => f.write_str("external"),
|
||||
Server::Internal { .. } => f.write_str("internal"),
|
||||
Server::External { .. } => f.write_str("external"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Server> for Router<Arc<ComputeNode>> {
|
||||
fn from(server: Server) -> Self {
|
||||
impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
fn from(server: &Server) -> Self {
|
||||
let mut router = Router::<Arc<ComputeNode>>::new();
|
||||
|
||||
router = match server {
|
||||
Server::Internal(_) => {
|
||||
Server::Internal { .. } => {
|
||||
router = router
|
||||
.route(
|
||||
"/extension_server/{*filename}",
|
||||
@@ -69,59 +79,71 @@ impl From<Server> for Router<Arc<ComputeNode>> {
|
||||
|
||||
router
|
||||
}
|
||||
Server::External(_) => router
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
|
||||
.route("/insights", get(insights::get_insights))
|
||||
.route("/metrics", get(metrics::get_metrics))
|
||||
.route("/metrics.json", get(metrics_json::get_metrics))
|
||||
.route("/status", get(status::get_status))
|
||||
.route("/terminate", post(terminate::terminate)),
|
||||
Server::External {
|
||||
jwks, compute_id, ..
|
||||
} => {
|
||||
let unauthenticated_router =
|
||||
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
|
||||
.route("/insights", get(insights::get_insights))
|
||||
.route("/metrics.json", get(metrics_json::get_metrics))
|
||||
.route("/status", get(status::get_status))
|
||||
.route("/terminate", post(terminate::terminate))
|
||||
.layer(AsyncRequireAuthorizationLayer::new(Authorize::new(
|
||||
compute_id.clone(),
|
||||
jwks.clone(),
|
||||
)));
|
||||
|
||||
router
|
||||
.merge(unauthenticated_router)
|
||||
.merge(authenticated_router)
|
||||
}
|
||||
};
|
||||
|
||||
router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer(
|
||||
ServiceBuilder::new()
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
let request_id = request
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
match request.uri().path() {
|
||||
"/metrics" => {
|
||||
debug!(%request_id, "{} {}", request.method(), request.uri())
|
||||
}
|
||||
_ => info!(%request_id, "{} {}", request.method(), request.uri()),
|
||||
};
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
let request_id = response
|
||||
router
|
||||
.fallback(Server::handle_404)
|
||||
.method_not_allowed_fallback(Server::handle_405)
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
.layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO))
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
let request_id = request
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
info!(
|
||||
%request_id,
|
||||
code = response.status().as_u16(),
|
||||
latency = latency.as_millis()
|
||||
)
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
.layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO))
|
||||
info!(%request_id, "{} {}", request.method(), request.uri());
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
let request_id = response
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
info!(
|
||||
%request_id,
|
||||
code = response.status().as_u16(),
|
||||
latency = latency.as_millis()
|
||||
);
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,15 +167,15 @@ impl Server {
|
||||
match self {
|
||||
// TODO: Change this to Ipv6Addr::LOCALHOST when the GitHub runners
|
||||
// allow binding to localhost
|
||||
Server::Internal(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
|
||||
Server::External(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
|
||||
Server::Internal { .. } => IpAddr::from(Ipv6Addr::UNSPECIFIED),
|
||||
Server::External { .. } => IpAddr::from(Ipv6Addr::UNSPECIFIED),
|
||||
}
|
||||
}
|
||||
|
||||
fn port(self) -> u16 {
|
||||
fn port(&self) -> u16 {
|
||||
match self {
|
||||
Server::Internal(port) => port,
|
||||
Server::External(port) => port,
|
||||
Server::Internal { port, .. } => *port,
|
||||
Server::External { port, .. } => *port,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,7 +202,9 @@ impl Server {
|
||||
);
|
||||
}
|
||||
|
||||
let router = Router::from(self).with_state(compute);
|
||||
let router = Router::from(&self)
|
||||
.with_state(compute)
|
||||
.into_make_service_with_connect_info::<SocketAddr>();
|
||||
|
||||
if let Err(e) = axum::serve(listener, router).await {
|
||||
error!("compute_ctl {} HTTP server error: {}", self, e);
|
||||
|
||||
@@ -21,6 +21,7 @@ mod migration;
|
||||
pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
pub mod rsyslog;
|
||||
pub mod spec;
|
||||
mod spec_apply;
|
||||
pub mod swap;
|
||||
|
||||
@@ -54,9 +54,7 @@ pub(crate) static REMOTE_EXT_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(||
|
||||
register_int_counter_vec!(
|
||||
"compute_ctl_remote_ext_requests_total",
|
||||
"Total number of requests made by compute_ctl to download extensions from S3 proxy by status",
|
||||
// Do not use any labels like extension name yet.
|
||||
// We can add them later if needed.
|
||||
&["http_status"]
|
||||
&["http_status", "filename"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
80
compute_tools/src/rsyslog.rs
Normal file
80
compute_tools/src/rsyslog.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use std::process::Command;
|
||||
use std::{fs::OpenOptions, io::Write};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use tracing::info;
|
||||
|
||||
fn get_rsyslog_pid() -> Option<String> {
|
||||
let output = Command::new("pgrep")
|
||||
.arg("rsyslogd")
|
||||
.output()
|
||||
.expect("Failed to execute pgrep");
|
||||
|
||||
if !output.stdout.is_empty() {
|
||||
let pid = std::str::from_utf8(&output.stdout)
|
||||
.expect("Invalid UTF-8 in process output")
|
||||
.trim()
|
||||
.to_string();
|
||||
Some(pid)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
// Start rsyslogd with the specified configuration file
|
||||
// If it is already running, do nothing.
|
||||
fn start_rsyslog(rsyslog_conf_path: &str) -> Result<()> {
|
||||
let pid = get_rsyslog_pid();
|
||||
if let Some(pid) = pid {
|
||||
info!("rsyslogd is already running with pid: {}", pid);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let _ = Command::new("/usr/sbin/rsyslogd")
|
||||
.arg("-f")
|
||||
.arg(rsyslog_conf_path)
|
||||
.arg("-i")
|
||||
.arg("/var/run/rsyslogd/rsyslogd.pid")
|
||||
.output()
|
||||
.context("Failed to start rsyslogd")?;
|
||||
|
||||
// Check that rsyslogd is running
|
||||
if let Some(pid) = get_rsyslog_pid() {
|
||||
info!("rsyslogd started successfully with pid: {}", pid);
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("Failed to start rsyslogd"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn configure_and_start_rsyslog(
|
||||
log_directory: &str,
|
||||
tag: &str,
|
||||
remote_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
tag = tag,
|
||||
remote_endpoint = remote_endpoint
|
||||
);
|
||||
|
||||
info!("rsyslog config_content: {}", config_content);
|
||||
|
||||
let rsyslog_conf_path = "/etc/compute_rsyslog.conf";
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(rsyslog_conf_path)?;
|
||||
|
||||
file.write_all(config_content.as_bytes())?;
|
||||
|
||||
info!("rsyslog configuration added successfully. Starting rsyslogd");
|
||||
|
||||
// start the service, using the configuration
|
||||
start_rsyslog(rsyslog_conf_path)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -6,7 +6,7 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
|
||||
use compute_api::spec::{ComputeAudit, ComputeFeature, ComputeSpec, Database, PgIdent, Role};
|
||||
use futures::future::join_all;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_postgres::Client;
|
||||
@@ -19,10 +19,10 @@ use crate::pg_helpers::{
|
||||
get_existing_roles_async,
|
||||
};
|
||||
use crate::spec_apply::ApplySpecPhase::{
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
|
||||
CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
|
||||
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
|
||||
RunInEachDatabase,
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
|
||||
CreatePgauditlogtofileExtension, CreateSchemaNeon, CreateSuperUser, DisablePostgresDBPgAudit,
|
||||
DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, HandleNeonExtension,
|
||||
HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
|
||||
};
|
||||
use crate::spec_apply::PerDatabasePhase::{
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
|
||||
@@ -277,6 +277,19 @@ impl ComputeNode {
|
||||
phases.push(FinalizeDropLogicalSubscriptions);
|
||||
}
|
||||
|
||||
// Keep DisablePostgresDBPgAudit phase at the end,
|
||||
// so that all config operations are audit logged.
|
||||
match spec.audit_log_level
|
||||
{
|
||||
ComputeAudit::Hipaa => {
|
||||
phases.push(CreatePgauditExtension);
|
||||
phases.push(CreatePgauditlogtofileExtension);
|
||||
phases.push(DisablePostgresDBPgAudit);
|
||||
}
|
||||
ComputeAudit::Log => { /* not implemented yet */ }
|
||||
ComputeAudit::Disabled => {}
|
||||
}
|
||||
|
||||
for phase in phases {
|
||||
debug!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
@@ -463,6 +476,9 @@ pub enum ApplySpecPhase {
|
||||
CreateAndAlterDatabases,
|
||||
CreateSchemaNeon,
|
||||
RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
|
||||
CreatePgauditExtension,
|
||||
CreatePgauditlogtofileExtension,
|
||||
DisablePostgresDBPgAudit,
|
||||
HandleOtherExtensions,
|
||||
HandleNeonExtension,
|
||||
CreateAvailabilityCheck,
|
||||
@@ -1098,6 +1114,25 @@ async fn get_operations<'a>(
|
||||
}
|
||||
Ok(Box::new(empty()))
|
||||
}
|
||||
ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit"),
|
||||
comment: Some(String::from("create pgaudit extensions")),
|
||||
}))),
|
||||
ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pgauditlogtofile"),
|
||||
comment: Some(String::from("create pgauditlogtofile extensions")),
|
||||
}))),
|
||||
// Disable pgaudit logging for postgres database.
|
||||
// Postgres is neon system database used by monitors
|
||||
// and compute_ctl tuning functions and thus generates a lot of noise.
|
||||
// We do not consider data stored in this database as sensitive.
|
||||
ApplySpecPhase::DisablePostgresDBPgAudit => {
|
||||
let query = "ALTER DATABASE postgres SET pgaudit.log to 'none'";
|
||||
Ok(Box::new(once(Operation {
|
||||
query: query.to_string(),
|
||||
comment: Some(query.to_string()),
|
||||
})))
|
||||
}
|
||||
ApplySpecPhase::HandleNeonExtension => {
|
||||
let operations = vec![
|
||||
Operation {
|
||||
|
||||
@@ -48,7 +48,8 @@ use anyhow::{Context, Result, anyhow, bail};
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse};
|
||||
use compute_api::spec::{
|
||||
Cluster, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, RemoteExtSpec, Role,
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
|
||||
RemoteExtSpec, Role,
|
||||
};
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
@@ -668,6 +669,7 @@ impl Endpoint {
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: self.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
audit_log_level: ComputeAudit::Disabled,
|
||||
};
|
||||
|
||||
// this strange code is needed to support respec() in tests
|
||||
|
||||
@@ -165,8 +165,6 @@ pub struct NeonStorageControllerConf {
|
||||
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub long_reconcile_threshold: Option<Duration>,
|
||||
|
||||
pub load_safekeepers: bool,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -190,7 +188,6 @@ impl Default for NeonStorageControllerConf {
|
||||
max_secondary_lag_bytes: None,
|
||||
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
|
||||
long_reconcile_threshold: None,
|
||||
load_safekeepers: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -537,10 +537,6 @@ impl StorageController {
|
||||
args.push("--start-as-candidate".to_string());
|
||||
}
|
||||
|
||||
if self.config.load_safekeepers {
|
||||
args.push("--load-safekeepers".to_string());
|
||||
}
|
||||
|
||||
if let Some(private_key) = &self.private_key {
|
||||
let claims = Claims::new(None, Scope::PageServerApi);
|
||||
let jwt_token =
|
||||
|
||||
201
docs/rfcs/041-rel-sparse-keyspace.md
Normal file
201
docs/rfcs/041-rel-sparse-keyspace.md
Normal file
@@ -0,0 +1,201 @@
|
||||
# Sparse Keyspace for Relation Directories
|
||||
|
||||
## Summary
|
||||
|
||||
This is an RFC describing a new storage strategy for storing relation directories.
|
||||
|
||||
## Motivation
|
||||
|
||||
Postgres maintains a directory structure for databases and relations. In Neon, we store these information
|
||||
by serializing the directory data in a single key (see `pgdatadir_mapping.rs`).
|
||||
|
||||
```rust
|
||||
// DbDir:
|
||||
// 00 00000000 00000000 00000000 00 00000000
|
||||
|
||||
// RelDir:
|
||||
// 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
|
||||
```
|
||||
|
||||
We have a dedicated structure on the ingestion path to serialize the relation directory into this single key.
|
||||
|
||||
```rust
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
pub(crate) struct RelDirectory {
|
||||
// Set of relations that exist. (relfilenode, forknum)
|
||||
//
|
||||
// TODO: Store it as a btree or radix tree or something else that spans multiple
|
||||
// key-value pairs, if you have a lot of relations
|
||||
pub(crate) rels: HashSet<(Oid, u8)>,
|
||||
}
|
||||
```
|
||||
|
||||
The current codebase has the following three access patterns for the relation directory.
|
||||
|
||||
1. Check if a relation exists.
|
||||
2. List all relations.
|
||||
3. Create/drop a relation.
|
||||
|
||||
For (1), we currently have to get the reldir key, deserialize it, and check whether the relation exists in the
|
||||
hash set. For (2), we get the reldir key and the hash set. For (3), we need first to get
|
||||
and deserialize the key, add the new relation record to the hash set, and then serialize it and write it back.
|
||||
|
||||
If we have 100k relations in a database, we would have a 100k-large hash set. Then, every
|
||||
relation created and dropped would have deserialized and serialized this 100k-large hash set. This makes the
|
||||
relation create/drop process to be quadratic. When we check if a relation exists in the ingestion path,
|
||||
we would have to deserialize this super big 100k-large key before checking if a single relation exists.
|
||||
|
||||
In this RFC, we will propose a new way to store the reldir data in the sparse keyspace and propose how
|
||||
to seamlessly migrate users to use the new keyspace.
|
||||
|
||||
The PoC patch is implemented in [PR10316](https://github.com/neondatabase/neon/pull/10316).
|
||||
|
||||
## Key Mapping
|
||||
|
||||
We will use the recently introduced sparse keyspace to store actual data. Sparse keyspace was proposed in
|
||||
[038-aux-file-v2.md](038-aux-file-v2.md). The original reldir has one single value of `HashSet<(Oid, u8)>`
|
||||
for each of the databases (identified as `spcnode, dbnode`). We encode the `Oid` (`relnode, forknum`),
|
||||
into the key.
|
||||
|
||||
```plain
|
||||
(REL_DIR_KEY_PREFIX, spcnode, dbnode, relnode, forknum, 1) -> deleted
|
||||
(REL_DIR_KEY_PREFIX, spcnode, dbnode, relnode, forknum, 1) -> exists
|
||||
```
|
||||
|
||||
Assume all reldir data are stored in this new keyspace; the 3 reldir operations we mentioned before can be
|
||||
implemented as follows.
|
||||
|
||||
1. Check if a relation exists: check if the key maps to "exists".
|
||||
2. List all relations: scan the sprase keyspace over the `rel_dir_key_prefix`. Extract relnode and forknum from the key.
|
||||
3. Create/drop a relation: write "exists" or "deleted" to the corresponding key of the relation. The delete tombstone will
|
||||
be removed during image layer generation upon compaction.
|
||||
|
||||
Note that "exists" and "deleted" will be encoded as a single byte as two variants of an enum.
|
||||
The mapping is implemented as `rel_tag_sparse_key` in the PoC patch.
|
||||
|
||||
## Changes to Sparse Keyspace
|
||||
|
||||
Previously, we only used sparse keyspaces for the aux files, which did not carry over when branching. The reldir
|
||||
information needs to be preserved from the parent branch to the child branch. Therefore, the read path needs
|
||||
to be updated accordingly to accommodate such "inherited sparse keys". This is done in
|
||||
[PR#10313](https://github.com/neondatabase/neon/pull/10313).
|
||||
|
||||
## Coexistence of the Old and New Keyspaces
|
||||
|
||||
Migrating to the new keyspace will be done gradually: when we flip a config item to enable the new reldir keyspace, the
|
||||
ingestion path will start to write to the new keyspace and the old reldir data will be kept in the old one. The read
|
||||
path needs to combine the data from both keyspaces.
|
||||
|
||||
Theoretically, we could do a rewrite at the startup time that scans all relation directories and copies that data into the
|
||||
new keyspace. However, this could take a long time, especially if we have thousands of tenants doing the migration
|
||||
process simultaneously after the pageserver restarts. Therefore, we propose the coexistence strategy so that the
|
||||
migration can happen seamlessly and imposes no potential downtime for the user.
|
||||
|
||||
With the coexistence assumption, the 3 reldir operations will be implemented as follows:
|
||||
|
||||
1. Check if a relation exists
|
||||
- Check the new keyspace if the key maps to any value. If it maps to "exists" or "deleted", directly
|
||||
return it to the user.
|
||||
- Otherwise, deserialize the old reldir key and get the result.
|
||||
2. List all relations: scan the sparse keyspace over the `rel_dir_key_prefix` and deserialize the old reldir key.
|
||||
Combine them to obtain the final result.
|
||||
3. Create/drop a relation: write "exists" or "deleted" to the corresponding key of the relation into the new keyspace.
|
||||
- We assume no overwrite of relations will happen (i.e., the user won't create a relation at the same Oid). This will be implemented as a runtime check.
|
||||
- For relation creation, we add `sparse_reldir_tableX -> exists` to the keyspace.
|
||||
- For relation drop, we first check if the relation is recorded in the old keyspace. If yes, we deserialize the old reldir key,
|
||||
remove the relation, and then write it back. Otherwise, we put `sparse_reldir_tableX -> deleted` to the keyspace.
|
||||
- The delete tombstone will be removed during image layer generation upon compaction.
|
||||
|
||||
This process ensures that the transition will not introduce any downtime and all new updates are written to the new keyspace. The total
|
||||
amount of data in the storage would be `O(relations_modifications)` and we can guarantee `O(current_relations)` after compaction.
|
||||
There could be some relations that exist in the old reldir key for a long time. Refer to the "Full Migration" section on how to deal
|
||||
with them. Plus, for relation modifications, it will have `O(old_relations)` complexity until we do the full migration, which gives
|
||||
us `O(1)` complexity after fully opt-in the sparse keyspace.
|
||||
|
||||
The process also implies that a relation will only exists either in the old reldir key or in the new sparse keyspace. It is not possible
|
||||
to have a table to be recorded in the old reldir key while later having a delete tombstone for it in the sparse keyspace at any LSN.
|
||||
|
||||
We will introduce a config item and an index_part record to record the current status of the migration process.
|
||||
|
||||
- Config item `enable_reldir_v2`: controls whether the ingestion path writes the reldir info into the new keyspace.
|
||||
- `index_part.json` field `reldir_v2_status`: whether the timeline has written any key into the new reldir keyspace.
|
||||
|
||||
If `enable_reldir_v2` is set to `true` and the timeline ingests the first key into the new reldir keyspace, it will update
|
||||
`index_part.json` to set `reldir_v2_status` to `Status::Migrating`. Even if `enable_reldir_v2` gets flipped back to
|
||||
`false` (i.e., when the pageserver restarts and such config isn't persisted), the read/write path will still
|
||||
read/write to the new keyspace to avoid data inconsistency. This also indicates that the migration is one-way only:
|
||||
once v2 is enabled, the user cannot go back to v1.
|
||||
|
||||
## Next Steps
|
||||
|
||||
### Full Migration
|
||||
|
||||
This won't be implemented in the project's first phase but might be implemented in the future. Having both v1 and
|
||||
v2 existing in the system would force us to keep the code to deserialize the old reldir key forever. To entirely deprecate this
|
||||
code path, we must ensure the timeline has no old reldir data.
|
||||
|
||||
We can trigger a special image layer generation process at the gc-horizon. The generated image layers will cover several keyspaces:
|
||||
the old reldir key in each of the databases, and the new reldir sparse keyspace. It will remove the old reldir key while
|
||||
copying them into the corresponding keys in the sparse keyspace in the resulting image. This special process happens in
|
||||
the background during compaction. For example, assume this special process is triggered at LSN 0/180. The `create_image_layers`
|
||||
process discovers the following keys at this LSN.
|
||||
|
||||
```plain
|
||||
db1/reldir_key -> (table 1, table 2, table 3)
|
||||
...db1 rel keys
|
||||
db2/reldir_key -> (table 4, table 5, table 6)
|
||||
...db2 rel keys
|
||||
sparse_reldir_db2_table7 -> exists
|
||||
sparse_reldir_db1_table8 -> deleted
|
||||
```
|
||||
|
||||
It will generate the following keys:
|
||||
|
||||
```plain
|
||||
db1/reldir_key -> () # we have to keep the key because it is part of `collect_keyspace`.
|
||||
...db1 rel keys
|
||||
db2/reldir_key -> ()
|
||||
...db2 rel keys
|
||||
|
||||
-- start image layer for the sparse keyspace at sparse_reldir_prefix at LSN 0/180
|
||||
sparse_reldir_db1_table1 -> exists
|
||||
sparse_reldir_db1_table2 -> exists
|
||||
sparse_reldir_db1_table3 -> exists
|
||||
sparse_reldir_db2_table4 -> exists
|
||||
sparse_reldir_db2_table5 -> exists
|
||||
sparse_reldir_db2_table6 -> exists
|
||||
sparse_reldir_db2_table7 -> exists
|
||||
-- end image layer for the sparse keyspace at sparse_reldir_prefix+1
|
||||
|
||||
# The `sparse_reldir_db1_table8` key gets dropped as part of the image layer generation code for the sparse keyspace.
|
||||
# Note that the read path will stop reading if a key is not found in the image layer covering the key range so there
|
||||
# are no correctness issue.
|
||||
```
|
||||
|
||||
We must verify that no pending modifications to the old reldir exists in the delta/image layers above the gc-horizon before
|
||||
we start this process (We can do a vectored read to get the full key history of the old reldir key and ensure there are no more images
|
||||
above the gc-horizon). Otherwise, it will violate the property that "a relation will only exists either in the old reldir key or
|
||||
in the new sparse keyspace". After we run this migration process, we can mark `reldir_v2_status` in the `index_part.json` to
|
||||
`Status::Migrated`, and the read path won't need to read from the old reldir anymore. Once the status is set to `Migrated`, we
|
||||
don't need to add the key into `collect_keyspace` and therefore all of them will be removed from all future image layers.
|
||||
|
||||
The migration process can be proactively triggered across all attached/detached tenants to help us fully remove the old reldir code.
|
||||
|
||||
### Consolidate Relation Size Keys
|
||||
|
||||
We have relsize at the end of all relation nodes.
|
||||
|
||||
```plain
|
||||
// RelSize:
|
||||
// 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
|
||||
```
|
||||
|
||||
This means that computing logical size requires us to do several single-key gets across the keyspace,
|
||||
potentially requiring downloading many layer files. We could consolidate them into a single
|
||||
keyspace, improving logical size calculation performance.
|
||||
|
||||
### Migrate DBDir Keys
|
||||
|
||||
We assume the number of databases created by the users will be small, and therefore, the current way
|
||||
of storing the database directory would be acceptable. In the future, we could also migrate DBDir keys into
|
||||
the sparse keyspace to support large amount of databases.
|
||||
@@ -134,8 +134,10 @@ pub struct CatalogObjects {
|
||||
pub databases: Vec<Database>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct ComputeCtlConfig {
|
||||
/// Set of JSON web keys that the compute can use to authenticate
|
||||
/// communication from the control plane.
|
||||
pub jwks: JwkSet,
|
||||
}
|
||||
|
||||
|
||||
@@ -155,6 +155,16 @@ pub struct ComputeSpec {
|
||||
/// over the same replication content from publisher.
|
||||
#[serde(default)] // Default false
|
||||
pub drop_subscriptions_before_start: bool,
|
||||
|
||||
/// Log level for audit logging:
|
||||
///
|
||||
/// Disabled - no audit logging. This is the default.
|
||||
/// log - log masked statements to the postgres log using pgaudit extension
|
||||
/// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension
|
||||
///
|
||||
/// Extensions should be present in shared_preload_libraries
|
||||
#[serde(default)]
|
||||
pub audit_log_level: ComputeAudit,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
@@ -262,6 +272,17 @@ pub enum ComputeMode {
|
||||
Replica,
|
||||
}
|
||||
|
||||
/// Log level for audit logging
|
||||
/// Disabled, log, hipaa
|
||||
/// Default is Disabled
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub enum ComputeAudit {
|
||||
#[default]
|
||||
Disabled,
|
||||
Log,
|
||||
Hipaa,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Cluster {
|
||||
pub cluster_id: Option<String>,
|
||||
|
||||
@@ -6,11 +6,8 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
backtrace.workspace = true
|
||||
bytes.workspace = true
|
||||
inferno.workspace = true
|
||||
fail.workspace = true
|
||||
flate2.workspace = true
|
||||
hyper0.workspace = true
|
||||
itertools.workspace = true
|
||||
jemalloc_pprof.workspace = true
|
||||
|
||||
@@ -3,8 +3,6 @@ use std::io::Write as _;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use ::pprof::ProfilerGuardBuilder;
|
||||
use ::pprof::protos::Message as _;
|
||||
use anyhow::{Context, anyhow};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use hyper::header::{AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_TYPE, HeaderName};
|
||||
@@ -12,7 +10,8 @@ use hyper::http::HeaderValue;
|
||||
use hyper::{Body, Method, Request, Response};
|
||||
use metrics::{Encoder, IntCounter, TextEncoder, register_int_counter};
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use pprof::ProfilerGuardBuilder;
|
||||
use pprof::protos::Message as _;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tokio::sync::{Mutex, Notify, mpsc};
|
||||
@@ -22,7 +21,6 @@ use tracing::{Instrument, debug, info, info_span, warn};
|
||||
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
|
||||
|
||||
use crate::error::{ApiError, api_error_handler, route_error_handler};
|
||||
use crate::pprof;
|
||||
use crate::request::{get_query_param, parse_query_param};
|
||||
|
||||
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
@@ -449,20 +447,6 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
|
||||
};
|
||||
|
||||
// Functions and mappings to strip when symbolizing pprof profiles. If true,
|
||||
// also remove child frames.
|
||||
static STRIP_FUNCTIONS: Lazy<Vec<(Regex, bool)>> = Lazy::new(|| {
|
||||
vec![
|
||||
(Regex::new("^__rust").unwrap(), false),
|
||||
(Regex::new("^_start$").unwrap(), false),
|
||||
(Regex::new("^irallocx_prof").unwrap(), true),
|
||||
(Regex::new("^prof_alloc_prep").unwrap(), true),
|
||||
(Regex::new("^std::rt::lang_start").unwrap(), false),
|
||||
(Regex::new("^std::sys::backtrace::__rust").unwrap(), false),
|
||||
]
|
||||
});
|
||||
const STRIP_MAPPINGS: &[&str] = &["libc", "libgcc", "pthread", "vdso"];
|
||||
|
||||
// Obtain profiler handle.
|
||||
let mut prof_ctl = jemalloc_pprof::PROF_CTL
|
||||
.as_ref()
|
||||
@@ -495,45 +479,27 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
}
|
||||
|
||||
Format::Pprof => {
|
||||
let data = tokio::task::spawn_blocking(move || {
|
||||
let bytes = prof_ctl.dump_pprof()?;
|
||||
// Symbolize the profile.
|
||||
// TODO: consider moving this upstream to jemalloc_pprof and avoiding the
|
||||
// serialization roundtrip.
|
||||
let profile = pprof::decode(&bytes)?;
|
||||
let profile = pprof::symbolize(profile)?;
|
||||
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
|
||||
pprof::encode(&profile)
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let data = tokio::task::spawn_blocking(move || prof_ctl.dump_pprof())
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "application/octet-stream")
|
||||
.header(CONTENT_DISPOSITION, "attachment; filename=\"heap.pb\"")
|
||||
.header(CONTENT_DISPOSITION, "attachment; filename=\"heap.pb.gz\"")
|
||||
.body(Body::from(data))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
|
||||
Format::Svg => {
|
||||
let body = tokio::task::spawn_blocking(move || {
|
||||
let bytes = prof_ctl.dump_pprof()?;
|
||||
let profile = pprof::decode(&bytes)?;
|
||||
let profile = pprof::symbolize(profile)?;
|
||||
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
|
||||
let mut opts = inferno::flamegraph::Options::default();
|
||||
opts.title = "Heap inuse".to_string();
|
||||
opts.count_name = "bytes".to_string();
|
||||
pprof::flamegraph(profile, &mut opts)
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let svg = tokio::task::spawn_blocking(move || prof_ctl.dump_flamegraph())
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "image/svg+xml")
|
||||
.body(Body::from(body))
|
||||
.body(Body::from(svg))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ pub mod endpoint;
|
||||
pub mod error;
|
||||
pub mod failpoints;
|
||||
pub mod json;
|
||||
pub mod pprof;
|
||||
pub mod request;
|
||||
|
||||
extern crate hyper0 as hyper;
|
||||
|
||||
@@ -1,238 +0,0 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::ffi::c_void;
|
||||
use std::io::Write as _;
|
||||
|
||||
use anyhow::bail;
|
||||
use flate2::Compression;
|
||||
use flate2::write::{GzDecoder, GzEncoder};
|
||||
use itertools::Itertools as _;
|
||||
use pprof::protos::{Function, Line, Location, Message as _, Profile};
|
||||
use regex::Regex;
|
||||
|
||||
/// Decodes a gzip-compressed Protobuf-encoded pprof profile.
|
||||
pub fn decode(bytes: &[u8]) -> anyhow::Result<Profile> {
|
||||
let mut gz = GzDecoder::new(Vec::new());
|
||||
gz.write_all(bytes)?;
|
||||
Ok(Profile::parse_from_bytes(&gz.finish()?)?)
|
||||
}
|
||||
|
||||
/// Encodes a pprof profile as gzip-compressed Protobuf.
|
||||
pub fn encode(profile: &Profile) -> anyhow::Result<Vec<u8>> {
|
||||
let mut gz = GzEncoder::new(Vec::new(), Compression::default());
|
||||
profile.write_to_writer(&mut gz)?;
|
||||
Ok(gz.finish()?)
|
||||
}
|
||||
|
||||
/// Symbolizes a pprof profile using the current binary.
|
||||
pub fn symbolize(mut profile: Profile) -> anyhow::Result<Profile> {
|
||||
if !profile.function.is_empty() {
|
||||
return Ok(profile); // already symbolized
|
||||
}
|
||||
|
||||
// Collect function names.
|
||||
let mut functions: HashMap<String, Function> = HashMap::new();
|
||||
let mut strings: HashMap<String, i64> = profile
|
||||
.string_table
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, s)| (s, i as i64))
|
||||
.collect();
|
||||
|
||||
// Helper to look up or register a string.
|
||||
let mut string_id = |s: &str| -> i64 {
|
||||
// Don't use .entry() to avoid unnecessary allocations.
|
||||
if let Some(id) = strings.get(s) {
|
||||
return *id;
|
||||
}
|
||||
let id = strings.len() as i64;
|
||||
strings.insert(s.to_string(), id);
|
||||
id
|
||||
};
|
||||
|
||||
for loc in &mut profile.location {
|
||||
if !loc.line.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Resolve the line and function for each location.
|
||||
backtrace::resolve(loc.address as *mut c_void, |symbol| {
|
||||
let Some(symbol_name) = symbol.name() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let function_name = format!("{symbol_name:#}");
|
||||
let functions_len = functions.len();
|
||||
let function_id = functions
|
||||
.entry(function_name)
|
||||
.or_insert_with_key(|function_name| {
|
||||
let function_id = functions_len as u64 + 1;
|
||||
let system_name = String::from_utf8_lossy(symbol_name.as_bytes());
|
||||
let filename = symbol
|
||||
.filename()
|
||||
.map(|path| path.to_string_lossy())
|
||||
.unwrap_or(Cow::Borrowed(""));
|
||||
Function {
|
||||
id: function_id,
|
||||
name: string_id(function_name),
|
||||
system_name: string_id(&system_name),
|
||||
filename: string_id(&filename),
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
.id;
|
||||
loc.line.push(Line {
|
||||
function_id,
|
||||
line: symbol.lineno().unwrap_or(0) as i64,
|
||||
..Default::default()
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Store the resolved functions, and mark the mapping as resolved.
|
||||
profile.function = functions.into_values().sorted_by_key(|f| f.id).collect();
|
||||
profile.string_table = strings
|
||||
.into_iter()
|
||||
.sorted_by_key(|(_, i)| *i)
|
||||
.map(|(s, _)| s)
|
||||
.collect();
|
||||
|
||||
for mapping in &mut profile.mapping {
|
||||
mapping.has_functions = true;
|
||||
mapping.has_filenames = true;
|
||||
}
|
||||
|
||||
Ok(profile)
|
||||
}
|
||||
|
||||
/// Strips locations (stack frames) matching the given mappings (substring) or function names
|
||||
/// (regex). The function bool specifies whether child frames should be stripped as well.
|
||||
///
|
||||
/// The string definitions are left behind in the profile for simplicity, to avoid rewriting all
|
||||
/// string references.
|
||||
pub fn strip_locations(
|
||||
mut profile: Profile,
|
||||
mappings: &[&str],
|
||||
functions: &[(Regex, bool)],
|
||||
) -> Profile {
|
||||
// Strip mappings.
|
||||
let mut strip_mappings: HashSet<u64> = HashSet::new();
|
||||
|
||||
profile.mapping.retain(|mapping| {
|
||||
let Some(name) = profile.string_table.get(mapping.filename as usize) else {
|
||||
return true;
|
||||
};
|
||||
if mappings.iter().any(|substr| name.contains(substr)) {
|
||||
strip_mappings.insert(mapping.id);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
// Strip functions.
|
||||
let mut strip_functions: HashMap<u64, bool> = HashMap::new();
|
||||
|
||||
profile.function.retain(|function| {
|
||||
let Some(name) = profile.string_table.get(function.name as usize) else {
|
||||
return true;
|
||||
};
|
||||
for (regex, strip_children) in functions {
|
||||
if regex.is_match(name) {
|
||||
strip_functions.insert(function.id, *strip_children);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
// Strip locations. The bool specifies whether child frames should be stripped too.
|
||||
let mut strip_locations: HashMap<u64, bool> = HashMap::new();
|
||||
|
||||
profile.location.retain(|location| {
|
||||
for line in &location.line {
|
||||
if let Some(strip_children) = strip_functions.get(&line.function_id) {
|
||||
strip_locations.insert(location.id, *strip_children);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if strip_mappings.contains(&location.mapping_id) {
|
||||
strip_locations.insert(location.id, false);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
// Strip sample locations.
|
||||
for sample in &mut profile.sample {
|
||||
// First, find the uppermost function with child removal and truncate the stack.
|
||||
if let Some(truncate) = sample
|
||||
.location_id
|
||||
.iter()
|
||||
.rposition(|id| strip_locations.get(id) == Some(&true))
|
||||
{
|
||||
sample.location_id.drain(..=truncate);
|
||||
}
|
||||
// Next, strip any individual frames without child removal.
|
||||
sample
|
||||
.location_id
|
||||
.retain(|id| !strip_locations.contains_key(id));
|
||||
}
|
||||
|
||||
profile
|
||||
}
|
||||
|
||||
/// Generates an SVG flamegraph from a symbolized pprof profile.
|
||||
pub fn flamegraph(
|
||||
profile: Profile,
|
||||
opts: &mut inferno::flamegraph::Options,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
if profile.mapping.iter().any(|m| !m.has_functions) {
|
||||
bail!("profile not symbolized");
|
||||
}
|
||||
|
||||
// Index locations, functions, and strings.
|
||||
let locations: HashMap<u64, Location> =
|
||||
profile.location.into_iter().map(|l| (l.id, l)).collect();
|
||||
let functions: HashMap<u64, Function> =
|
||||
profile.function.into_iter().map(|f| (f.id, f)).collect();
|
||||
let strings = profile.string_table;
|
||||
|
||||
// Resolve stacks as function names, and sum sample values per stack. Also reverse the stack,
|
||||
// since inferno expects it bottom-up.
|
||||
let mut stacks: HashMap<Vec<&str>, i64> = HashMap::new();
|
||||
for sample in profile.sample {
|
||||
let mut stack = Vec::with_capacity(sample.location_id.len());
|
||||
for location in sample.location_id.into_iter().rev() {
|
||||
let Some(location) = locations.get(&location) else {
|
||||
bail!("missing location {location}");
|
||||
};
|
||||
for line in location.line.iter().rev() {
|
||||
let Some(function) = functions.get(&line.function_id) else {
|
||||
bail!("missing function {}", line.function_id);
|
||||
};
|
||||
let Some(name) = strings.get(function.name as usize) else {
|
||||
bail!("missing string {}", function.name);
|
||||
};
|
||||
stack.push(name.as_str());
|
||||
}
|
||||
}
|
||||
let Some(&value) = sample.value.first() else {
|
||||
bail!("missing value");
|
||||
};
|
||||
*stacks.entry(stack).or_default() += value;
|
||||
}
|
||||
|
||||
// Construct stack lines for inferno.
|
||||
let lines = stacks
|
||||
.into_iter()
|
||||
.map(|(stack, value)| (stack.into_iter().join(";"), value))
|
||||
.map(|(stack, value)| format!("{stack} {value}"))
|
||||
.sorted()
|
||||
.collect_vec();
|
||||
|
||||
// Construct the flamegraph.
|
||||
let mut bytes = Vec::new();
|
||||
let lines = lines.iter().map(|line| line.as_str());
|
||||
inferno::flamegraph::from_lines(opts, lines, &mut bytes)?;
|
||||
Ok(bytes)
|
||||
}
|
||||
@@ -123,6 +123,10 @@ pub struct ConfigToml {
|
||||
pub enable_read_path_debugging: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub validate_wal_contiguity: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub load_previous_heatmap: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generate_unarchival_heatmap: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -523,6 +527,8 @@ impl Default for ConfigToml {
|
||||
None
|
||||
},
|
||||
validate_wal_contiguity: None,
|
||||
load_previous_heatmap: None,
|
||||
generate_unarchival_heatmap: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ arc-swap.workspace = true
|
||||
sentry.workspace = true
|
||||
async-compression.workspace = true
|
||||
anyhow.workspace = true
|
||||
backtrace.workspace = true
|
||||
bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
|
||||
@@ -3,20 +3,24 @@ use std::env;
|
||||
|
||||
use sentry::ClientInitGuard;
|
||||
pub use sentry::release_name;
|
||||
use tracing::{error, info};
|
||||
|
||||
#[must_use]
|
||||
pub fn init_sentry(
|
||||
release_name: Option<Cow<'static, str>>,
|
||||
extra_options: &[(&str, &str)],
|
||||
) -> Option<ClientInitGuard> {
|
||||
let dsn = env::var("SENTRY_DSN").ok()?;
|
||||
let Ok(dsn) = env::var("SENTRY_DSN") else {
|
||||
info!("not initializing Sentry, no SENTRY_DSN given");
|
||||
return None;
|
||||
};
|
||||
let environment = env::var("SENTRY_ENVIRONMENT").unwrap_or_else(|_| "development".into());
|
||||
|
||||
let guard = sentry::init((
|
||||
dsn,
|
||||
sentry::ClientOptions {
|
||||
release: release_name,
|
||||
environment: Some(environment.into()),
|
||||
release: release_name.clone(),
|
||||
environment: Some(environment.clone().into()),
|
||||
..Default::default()
|
||||
},
|
||||
));
|
||||
@@ -25,5 +29,19 @@ pub fn init_sentry(
|
||||
scope.set_extra(key, value.into());
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(dsn) = guard.dsn() {
|
||||
info!(
|
||||
"initialized Sentry for project {}, environment {}, release {} (using API {})",
|
||||
dsn.project_id(),
|
||||
environment,
|
||||
release_name.unwrap_or(Cow::Borrowed("None")),
|
||||
dsn.envelope_api_url(),
|
||||
);
|
||||
} else {
|
||||
// This should panic during sentry::init(), but we may as well cover it.
|
||||
error!("failed to initialize Sentry, invalid DSN");
|
||||
}
|
||||
|
||||
Some(guard)
|
||||
}
|
||||
|
||||
@@ -194,6 +194,13 @@ pub struct PageServerConf {
|
||||
/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
|
||||
/// safekeepers does not have gaps.
|
||||
pub validate_wal_contiguity: bool,
|
||||
|
||||
/// When set, the previously written to disk heatmap is loaded on tenant attach and used
|
||||
/// to avoid clobbering the heatmap from new, cold, attached locations.
|
||||
pub load_previous_heatmap: bool,
|
||||
|
||||
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
|
||||
pub generate_unarchival_heatmap: bool,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -358,6 +365,8 @@ impl PageServerConf {
|
||||
get_vectored_concurrent_io,
|
||||
enable_read_path_debugging,
|
||||
validate_wal_contiguity,
|
||||
load_previous_heatmap,
|
||||
generate_unarchival_heatmap,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -447,6 +456,8 @@ impl PageServerConf {
|
||||
no_sync: no_sync.unwrap_or(false),
|
||||
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
|
||||
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
|
||||
load_previous_heatmap: load_previous_heatmap.unwrap_or(false),
|
||||
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(false),
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
@@ -493,6 +504,8 @@ impl PageServerConf {
|
||||
metric_collection_interval: Duration::from_secs(60),
|
||||
synthetic_size_calculation_interval: Duration::from_secs(60),
|
||||
background_task_maximum_delay: Duration::ZERO,
|
||||
load_previous_heatmap: Some(true),
|
||||
generate_unarchival_heatmap: Some(true),
|
||||
..Default::default()
|
||||
};
|
||||
PageServerConf::parse_and_validate(NodeId(0), config_toml, &repo_dir).unwrap()
|
||||
|
||||
@@ -602,28 +602,36 @@ impl Timeline {
|
||||
let n_blocks = self
|
||||
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||
for blkno in 0..n_blocks {
|
||||
let block = self
|
||||
.get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
|
||||
.await?;
|
||||
segment.extend_from_slice(&block[..BLCKSZ as usize]);
|
||||
}
|
||||
Ok(segment.freeze())
|
||||
}
|
||||
|
||||
/// Look up given SLRU page version.
|
||||
pub(crate) async fn get_slru_page_at_lsn(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
assert!(self.tenant_shard_id.is_shard_zero());
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
self.get(key, lsn, ctx).await
|
||||
let keyspace = KeySpace::single(
|
||||
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
|
||||
);
|
||||
|
||||
let batches = keyspace.partition(
|
||||
self.get_shard_identity(),
|
||||
Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
);
|
||||
|
||||
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||
for batch in batches.parts {
|
||||
let blocks = self
|
||||
.get_vectored(batch, lsn, io_concurrency.clone(), ctx)
|
||||
.await?;
|
||||
|
||||
for (_key, block) in blocks {
|
||||
let block = block?;
|
||||
segment.extend_from_slice(&block[..BLCKSZ as usize]);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(segment.freeze())
|
||||
}
|
||||
|
||||
/// Get size of an SLRU segment
|
||||
@@ -832,19 +840,41 @@ impl Timeline {
|
||||
let nblocks = self
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
|
||||
.await?;
|
||||
for blknum in (0..nblocks).rev() {
|
||||
let clog_page = self
|
||||
.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
|
||||
|
||||
let keyspace = KeySpace::single(
|
||||
slru_block_to_key(SlruKind::Clog, segno, 0)
|
||||
..slru_block_to_key(SlruKind::Clog, segno, nblocks),
|
||||
);
|
||||
|
||||
let batches = keyspace.partition(
|
||||
self.get_shard_identity(),
|
||||
Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
);
|
||||
|
||||
for batch in batches.parts.into_iter().rev() {
|
||||
let blocks = self
|
||||
.get_vectored(batch, probe_lsn, io_concurrency.clone(), ctx)
|
||||
.await?;
|
||||
|
||||
if clog_page.len() == BLCKSZ as usize + 8 {
|
||||
let mut timestamp_bytes = [0u8; 8];
|
||||
timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
|
||||
let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
|
||||
for (_key, clog_page) in blocks.into_iter().rev() {
|
||||
let clog_page = clog_page?;
|
||||
|
||||
match f(timestamp) {
|
||||
ControlFlow::Break(b) => return Ok(b),
|
||||
ControlFlow::Continue(()) => (),
|
||||
if clog_page.len() == BLCKSZ as usize + 8 {
|
||||
let mut timestamp_bytes = [0u8; 8];
|
||||
timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
|
||||
let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
|
||||
|
||||
match f(timestamp) {
|
||||
ControlFlow::Break(b) => return Ok(b),
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1934,14 +1964,12 @@ impl DatadirModification<'_> {
|
||||
.context("deserialize db")?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
|
||||
let v2_enabled = self.maybe_enable_rel_size_v2()?;
|
||||
|
||||
if v2_enabled {
|
||||
if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
let sparse_rel_dir_key =
|
||||
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
|
||||
// check if the rel_dir_key exists in v2
|
||||
@@ -1976,6 +2004,10 @@ impl DatadirModification<'_> {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
|
||||
} else {
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
if !dbdir_exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
|
||||
@@ -1989,6 +2021,7 @@ impl DatadirModification<'_> {
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
@@ -2111,7 +2144,7 @@ impl DatadirModification<'_> {
|
||||
// Remove entry from relation size cache
|
||||
self.tline.remove_cached_rel_size(&rel_tag);
|
||||
|
||||
// Delete size entry, as well as all blocks
|
||||
// Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
|
||||
self.delete(rel_key_range(rel_tag));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1150,7 +1150,7 @@ impl Tenant {
|
||||
// a previous heatmap which contains all visible layers in the layer map.
|
||||
// This previous heatmap will be used whenever a fresh heatmap is generated
|
||||
// for the timeline.
|
||||
if matches!(cause, LoadTimelineCause::Unoffload) {
|
||||
if self.conf.generate_unarchival_heatmap && matches!(cause, LoadTimelineCause::Unoffload) {
|
||||
let mut tline_ending_at = Some((&timeline, timeline.get_last_record_lsn()));
|
||||
while let Some((tline, end_lsn)) = tline_ending_at {
|
||||
let unarchival_heatmap = tline.generate_unarchival_heatmap(end_lsn).await;
|
||||
@@ -1582,6 +1582,10 @@ impl Tenant {
|
||||
}
|
||||
|
||||
async fn read_on_disk_heatmap(&self) -> Option<(HeatMapTenant, std::time::Instant)> {
|
||||
if !self.conf.load_previous_heatmap {
|
||||
return None;
|
||||
}
|
||||
|
||||
let on_disk_heatmap_path = self.conf.tenant_heatmap_path(&self.tenant_shard_id);
|
||||
match tokio::fs::read_to_string(on_disk_heatmap_path).await {
|
||||
Ok(heatmap) => match serde_json::from_str::<HeatMapTenant>(&heatmap) {
|
||||
@@ -2447,6 +2451,7 @@ impl Tenant {
|
||||
create_guard,
|
||||
initdb_lsn,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -2778,6 +2783,7 @@ impl Tenant {
|
||||
timeline_create_guard,
|
||||
initdb_lsn,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -4865,6 +4871,7 @@ impl Tenant {
|
||||
timeline_create_guard,
|
||||
start_lsn + 1,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
Some(src_timeline.get_rel_size_v2_status()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -5138,6 +5145,7 @@ impl Tenant {
|
||||
timeline_create_guard,
|
||||
pgdata_lsn,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -5216,13 +5224,14 @@ impl Tenant {
|
||||
create_guard: TimelineCreateGuard,
|
||||
start_lsn: Lsn,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
rel_size_v2_status: Option<RelSizeMigration>,
|
||||
) -> anyhow::Result<UninitializedTimeline<'a>> {
|
||||
let tenant_shard_id = self.tenant_shard_id;
|
||||
|
||||
let resources = self.build_timeline_resources(new_timeline_id);
|
||||
resources
|
||||
.remote_client
|
||||
.init_upload_queue_for_empty_remote(new_metadata)?;
|
||||
.init_upload_queue_for_empty_remote(new_metadata, rel_size_v2_status.clone())?;
|
||||
|
||||
let timeline_struct = self
|
||||
.create_timeline_struct(
|
||||
@@ -5234,7 +5243,7 @@ impl Tenant {
|
||||
CreateTimelineCause::Load,
|
||||
create_guard.idempotency.clone(),
|
||||
None,
|
||||
None,
|
||||
rel_size_v2_status,
|
||||
)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
|
||||
|
||||
@@ -437,9 +437,13 @@ impl RemoteTimelineClient {
|
||||
|
||||
/// Initialize the upload queue for the case where the remote storage is empty,
|
||||
/// i.e., it doesn't have an `IndexPart`.
|
||||
///
|
||||
/// `rel_size_v2_status` needs to be carried over during branching, and that's why
|
||||
/// it's passed in here.
|
||||
pub fn init_upload_queue_for_empty_remote(
|
||||
&self,
|
||||
local_metadata: &TimelineMetadata,
|
||||
rel_size_v2_status: Option<RelSizeMigration>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Set the maximum number of inprogress tasks to the remote storage concurrency. There's
|
||||
// certainly no point in starting more upload tasks than this.
|
||||
@@ -449,7 +453,9 @@ impl RemoteTimelineClient {
|
||||
.as_ref()
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
|
||||
let initialized_queue =
|
||||
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
|
||||
initialized_queue.dirty.rel_size_migration = rel_size_v2_status;
|
||||
self.update_remote_physical_size_gauge(None);
|
||||
info!("initialized upload queue as empty");
|
||||
Ok(())
|
||||
|
||||
@@ -15,7 +15,7 @@ use super::{
|
||||
Timeline,
|
||||
};
|
||||
|
||||
use anyhow::{Context, anyhow, bail};
|
||||
use anyhow::{Context, anyhow};
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
@@ -234,6 +234,12 @@ impl GcCompactionQueue {
|
||||
// it enough in staging yet.
|
||||
return Ok(());
|
||||
}
|
||||
if timeline.get_gc_compaction_watermark() == Lsn::INVALID {
|
||||
// If the gc watermark is not set, we don't need to trigger auto compaction.
|
||||
// This check is the same as in `gc_compaction_split_jobs` but we don't log
|
||||
// here and we can also skip the computation of the trigger condition earlier.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
|
||||
// Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
|
||||
@@ -357,8 +363,7 @@ impl GcCompactionQueue {
|
||||
GcCompactJob::from_compact_options(options.clone()),
|
||||
options.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
.await?;
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
self.notify_and_unblock(id);
|
||||
@@ -825,9 +830,7 @@ impl Timeline {
|
||||
.flags
|
||||
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
|
||||
{
|
||||
self.compact_with_gc(cancel, options, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
self.compact_with_gc(cancel, options, ctx).await?;
|
||||
return Ok(CompactionOutcome::Done);
|
||||
}
|
||||
|
||||
@@ -2345,12 +2348,19 @@ impl Timeline {
|
||||
async fn check_compaction_space(
|
||||
self: &Arc<Self>,
|
||||
layer_selection: &[Layer],
|
||||
) -> anyhow::Result<()> {
|
||||
let available_space = self.check_available_space().await?;
|
||||
) -> Result<(), CompactionError> {
|
||||
let available_space = self
|
||||
.check_available_space()
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
let mut remote_layer_size = 0;
|
||||
let mut all_layer_size = 0;
|
||||
for layer in layer_selection {
|
||||
let needs_download = layer.needs_download().await?;
|
||||
let needs_download = layer
|
||||
.needs_download()
|
||||
.await
|
||||
.context("failed to check if layer needs download")
|
||||
.map_err(CompactionError::Other)?;
|
||||
if needs_download.is_some() {
|
||||
remote_layer_size += layer.layer_desc().file_size;
|
||||
}
|
||||
@@ -2359,14 +2369,14 @@ impl Timeline {
|
||||
let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
|
||||
if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
|
||||
{
|
||||
return Err(anyhow!(
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
|
||||
available_space,
|
||||
allocated_space,
|
||||
all_layer_size,
|
||||
remote_layer_size,
|
||||
all_layer_size + remote_layer_size
|
||||
));
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -2397,7 +2407,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
job: GcCompactJob,
|
||||
sub_compaction_max_job_size_mb: Option<u64>,
|
||||
) -> anyhow::Result<Vec<GcCompactJob>> {
|
||||
) -> Result<Vec<GcCompactJob>, CompactionError> {
|
||||
let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
|
||||
job.compact_lsn_range.end
|
||||
} else {
|
||||
@@ -2548,7 +2558,7 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
options: CompactOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), CompactionError> {
|
||||
let sub_compaction = options.sub_compaction;
|
||||
let job = GcCompactJob::from_compact_options(options.clone());
|
||||
if sub_compaction {
|
||||
@@ -2580,7 +2590,7 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
job: GcCompactJob,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), CompactionError> {
|
||||
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
||||
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
||||
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
|
||||
@@ -2588,8 +2598,7 @@ impl Timeline {
|
||||
let gc_lock = async {
|
||||
tokio::select! {
|
||||
guard = self.gc_lock.lock() => Ok(guard),
|
||||
// TODO: refactor to CompactionError to correctly pass cancelled error
|
||||
_ = cancel.cancelled() => Err(anyhow!("cancelled")),
|
||||
_ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
|
||||
}
|
||||
};
|
||||
|
||||
@@ -2731,6 +2740,25 @@ impl Timeline {
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
if selected_layers.len() == 1 && !cfg!(test) {
|
||||
// In unit tests, we sometimes compact a single layer to test correctness.
|
||||
info!(
|
||||
"skipping gc-compaction: only one layer within the key range, gc_cutoff={}, key_range={}..{}",
|
||||
gc_cutoff, compact_key_range.start, compact_key_range.end
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
if selected_layers.iter().all(|l| !l.layer_desc().is_delta()) && !cfg!(test) {
|
||||
// In unit tests, we sometimes force compact image layers to test correctness.
|
||||
|
||||
// If the image layers overlaps, we could potentially consolidate them into a single level.
|
||||
// But the benefit of doing so is not worth the overhead.
|
||||
info!(
|
||||
"skipping gc-compaction: only image layers within the key range, gc_cutoff={}, key_range={}..{}",
|
||||
gc_cutoff, compact_key_range.start, compact_key_range.end
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
retain_lsns_below_horizon.sort();
|
||||
GcCompactionJobDescription {
|
||||
selected_layers,
|
||||
@@ -2810,10 +2838,10 @@ impl Timeline {
|
||||
.map(|layer| layer.layer_desc().layer_name())
|
||||
.collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
bail!(
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss",
|
||||
err
|
||||
);
|
||||
)));
|
||||
}
|
||||
// The maximum LSN we are processing in this compaction loop
|
||||
let end_lsn = job_desc
|
||||
@@ -2828,11 +2856,24 @@ impl Timeline {
|
||||
let mut total_downloaded_size = 0;
|
||||
let mut total_layer_size = 0;
|
||||
for layer in &job_desc.selected_layers {
|
||||
if layer.needs_download().await?.is_some() {
|
||||
if layer
|
||||
.needs_download()
|
||||
.await
|
||||
.context("failed to check if layer needs download")
|
||||
.map_err(CompactionError::Other)?
|
||||
.is_some()
|
||||
{
|
||||
total_downloaded_size += layer.layer_desc().file_size;
|
||||
}
|
||||
total_layer_size += layer.layer_desc().file_size;
|
||||
let resident_layer = layer.download_and_keep_resident(ctx).await?;
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
let resident_layer = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
.await
|
||||
.context("failed to download and keep resident layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
downloaded_layers.push(resident_layer);
|
||||
}
|
||||
info!(
|
||||
@@ -2843,19 +2884,33 @@ impl Timeline {
|
||||
);
|
||||
for resident_layer in &downloaded_layers {
|
||||
if resident_layer.layer_desc().is_delta() {
|
||||
let layer = resident_layer.get_as_delta(ctx).await?;
|
||||
let layer = resident_layer
|
||||
.get_as_delta(ctx)
|
||||
.await
|
||||
.context("failed to get delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
delta_layers.push(layer);
|
||||
} else {
|
||||
let layer = resident_layer.get_as_image(ctx).await?;
|
||||
let layer = resident_layer
|
||||
.get_as_image(ctx)
|
||||
.await
|
||||
.context("failed to get image layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
image_layers.push(layer);
|
||||
}
|
||||
}
|
||||
let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
|
||||
let (dense_ks, sparse_ks) = self
|
||||
.collect_gc_compaction_keyspace()
|
||||
.await
|
||||
.context("failed to collect gc compaction keyspace")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let mut merge_iter = FilterIterator::create(
|
||||
MergeIterator::create(&delta_layers, &image_layers, ctx),
|
||||
dense_ks,
|
||||
sparse_ks,
|
||||
)?;
|
||||
)
|
||||
.context("failed to create filter iterator")
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
// Step 2: Produce images+deltas.
|
||||
let mut accumulated_values = Vec::new();
|
||||
@@ -2874,7 +2929,9 @@ impl Timeline {
|
||||
self.get_compaction_target_size(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.context("failed to create image layer writer")
|
||||
.map_err(CompactionError::Other)?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
@@ -2887,7 +2944,9 @@ impl Timeline {
|
||||
lowest_retain_lsn..end_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context("failed to create delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
#[derive(Default)]
|
||||
struct RewritingLayers {
|
||||
@@ -2927,9 +2986,14 @@ impl Timeline {
|
||||
// the key and LSN range are determined. However, to keep things simple here, we still
|
||||
// create this writer, and discard the writer in the end.
|
||||
|
||||
while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
|
||||
while let Some(((key, lsn, val), desc)) = merge_iter
|
||||
.next_with_trace()
|
||||
.await
|
||||
.context("failed to get next key-value pair")
|
||||
.map_err(CompactionError::Other)?
|
||||
{
|
||||
if cancel.is_cancelled() {
|
||||
return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
// If this shard does not need to store this key, simply skip it.
|
||||
@@ -2960,7 +3024,9 @@ impl Timeline {
|
||||
desc.lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.context("failed to create delta layer writer")
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
rewriter.before.as_mut().unwrap()
|
||||
@@ -2975,14 +3041,20 @@ impl Timeline {
|
||||
desc.lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.context("failed to create delta layer writer")
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
rewriter.after.as_mut().unwrap()
|
||||
} else {
|
||||
unreachable!()
|
||||
};
|
||||
rewriter.put_value(key, lsn, val, ctx).await?;
|
||||
rewriter
|
||||
.put_value(key, lsn, val, ctx)
|
||||
.await
|
||||
.context("failed to put value")
|
||||
.map_err(CompactionError::Other)?;
|
||||
continue;
|
||||
}
|
||||
match val {
|
||||
@@ -3005,9 +3077,13 @@ impl Timeline {
|
||||
&job_desc.retain_lsns_below_horizon,
|
||||
COMPACTION_DELTA_THRESHOLD,
|
||||
get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
|
||||
.await?,
|
||||
.await
|
||||
.context("failed to get ancestor image")
|
||||
.map_err(CompactionError::Other)?,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context("failed to generate key retention")
|
||||
.map_err(CompactionError::Other)?;
|
||||
retention
|
||||
.pipe_to(
|
||||
*last_key,
|
||||
@@ -3016,7 +3092,9 @@ impl Timeline {
|
||||
&mut stat,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context("failed to pipe to delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
accumulated_values.clear();
|
||||
*last_key = key;
|
||||
accumulated_values.push((key, lsn, val));
|
||||
@@ -3034,9 +3112,14 @@ impl Timeline {
|
||||
job_desc.gc_cutoff,
|
||||
&job_desc.retain_lsns_below_horizon,
|
||||
COMPACTION_DELTA_THRESHOLD,
|
||||
get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
|
||||
get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn)
|
||||
.await
|
||||
.context("failed to get ancestor image")
|
||||
.map_err(CompactionError::Other)?,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context("failed to generate key retention")
|
||||
.map_err(CompactionError::Other)?;
|
||||
retention
|
||||
.pipe_to(
|
||||
last_key,
|
||||
@@ -3045,7 +3128,9 @@ impl Timeline {
|
||||
&mut stat,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.context("failed to pipe to delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
// end: move the above part to the loop body
|
||||
|
||||
let mut rewrote_delta_layers = Vec::new();
|
||||
@@ -3053,13 +3138,23 @@ impl Timeline {
|
||||
if let Some(delta_writer_before) = writers.before {
|
||||
let (desc, path) = delta_writer_before
|
||||
.finish(job_desc.compaction_key_range.start, ctx)
|
||||
.await?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
.await
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.context("failed to finish creating delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
rewrote_delta_layers.push(layer);
|
||||
}
|
||||
if let Some(delta_writer_after) = writers.after {
|
||||
let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
let (desc, path) = delta_writer_after
|
||||
.finish(key.key_range.end, ctx)
|
||||
.await
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.context("failed to finish creating delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
rewrote_delta_layers.push(layer);
|
||||
}
|
||||
}
|
||||
@@ -3074,7 +3169,9 @@ impl Timeline {
|
||||
let end_key = job_desc.compaction_key_range.end;
|
||||
writer
|
||||
.finish_with_discard_fn(self, ctx, end_key, discard)
|
||||
.await?
|
||||
.await
|
||||
.context("failed to finish image layer writer")
|
||||
.map_err(CompactionError::Other)?
|
||||
} else {
|
||||
drop(writer);
|
||||
Vec::new()
|
||||
@@ -3086,7 +3183,9 @@ impl Timeline {
|
||||
let produced_delta_layers = if !dry_run {
|
||||
delta_layer_writer
|
||||
.finish_with_discard_fn(self, ctx, discard)
|
||||
.await?
|
||||
.await
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?
|
||||
} else {
|
||||
drop(delta_layer_writer);
|
||||
Vec::new()
|
||||
@@ -3166,7 +3265,9 @@ impl Timeline {
|
||||
&layer.layer_desc().key_range,
|
||||
&job_desc.compaction_key_range,
|
||||
) {
|
||||
bail!("violated constraint: image layer outside of compaction key range");
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"violated constraint: image layer outside of compaction key range"
|
||||
)));
|
||||
}
|
||||
if !fully_contains(
|
||||
&job_desc.compaction_key_range,
|
||||
@@ -3181,7 +3282,9 @@ impl Timeline {
|
||||
|
||||
info!(
|
||||
"gc-compaction statistics: {}",
|
||||
serde_json::to_string(&stat)?
|
||||
serde_json::to_string(&stat)
|
||||
.context("failed to serialize gc-compaction statistics")
|
||||
.map_err(CompactionError::Other)?
|
||||
);
|
||||
|
||||
if dry_run {
|
||||
@@ -3220,10 +3323,10 @@ impl Timeline {
|
||||
// the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
|
||||
// in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
|
||||
if let Some(err) = check_valid_layermap(&final_layers) {
|
||||
bail!(
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss",
|
||||
err
|
||||
);
|
||||
)));
|
||||
}
|
||||
|
||||
// Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
|
||||
@@ -3275,7 +3378,9 @@ impl Timeline {
|
||||
// find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
|
||||
// be batched into `schedule_compaction_update`.
|
||||
let disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
self.schedule_uploads(disk_consistent_lsn, None)?;
|
||||
self.schedule_uploads(disk_consistent_lsn, None)
|
||||
.context("failed to schedule uploads")
|
||||
.map_err(CompactionError::Other)?;
|
||||
// If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
|
||||
// of `compact_from`.
|
||||
let compact_from = {
|
||||
|
||||
@@ -1369,6 +1369,10 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->limit;
|
||||
break;
|
||||
case 8:
|
||||
key = "file_cache_chunk_size_pages";
|
||||
value = BLOCKS_PER_CHUNK;
|
||||
break;
|
||||
default:
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
|
||||
@@ -1026,6 +1026,19 @@ prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, n
|
||||
if (!neon_prefetch_response_usable(&lsns[i], slot))
|
||||
continue;
|
||||
|
||||
/*
|
||||
* Ignore errors
|
||||
*/
|
||||
if (slot->response->tag != T_NeonGetPageResponse)
|
||||
{
|
||||
if (slot->response->tag != T_NeonErrorResponse)
|
||||
{
|
||||
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
|
||||
"Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x",
|
||||
T_NeonGetPageResponse, T_NeonErrorResponse, slot->response->tag);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
prefetch_set_unused(ring_index);
|
||||
BITMAP_SET(mask, i);
|
||||
|
||||
@@ -53,7 +53,7 @@ measured = { workspace = true, features = ["lasso"] }
|
||||
metrics.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry = { workspace = true, features = ["trace"] }
|
||||
papaya = "0.1.8"
|
||||
papaya = "0.2.0"
|
||||
parking_lot.workspace = true
|
||||
parquet.workspace = true
|
||||
parquet_derive.workspace = true
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::HashMap;
|
||||
use std::hash::BuildHasher;
|
||||
use std::{env, io};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::{array, env, fmt, io};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use indexmap::IndexSet;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use scopeguard::defer;
|
||||
use serde::ser::{SerializeMap, Serializer};
|
||||
@@ -17,6 +19,7 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields};
|
||||
use tracing_subscriber::layer::{Context, Layer};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
use try_lock::TryLock;
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
///
|
||||
@@ -46,13 +49,13 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
let otlp_layer = tracing_utils::init_tracing("proxy").await;
|
||||
|
||||
let json_log_layer = if logfmt == LogFormat::Json {
|
||||
Some(JsonLoggingLayer {
|
||||
clock: RealClock,
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
writer: StderrWriter {
|
||||
Some(JsonLoggingLayer::new(
|
||||
RealClock,
|
||||
StderrWriter {
|
||||
stderr: std::io::stderr(),
|
||||
},
|
||||
})
|
||||
["request_id", "session_id", "conn_id"],
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -191,13 +194,39 @@ thread_local! {
|
||||
}
|
||||
|
||||
/// Implements tracing layer to handle events specific to logging.
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
|
||||
clock: C,
|
||||
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
writer: W,
|
||||
// We use a const generic and arrays to bypass one heap allocation.
|
||||
extract_fields: IndexSet<&'static str>,
|
||||
_marker: std::marker::PhantomData<[&'static str; F]>,
|
||||
}
|
||||
|
||||
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
|
||||
impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
|
||||
fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
|
||||
JsonLoggingLayer {
|
||||
clock,
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
callsite_ids: papaya::HashMap::default(),
|
||||
writer,
|
||||
extract_fields: IndexSet::from_iter(extract_fields),
|
||||
_marker: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
|
||||
*self
|
||||
.callsite_ids
|
||||
.pin()
|
||||
.get_or_insert_with(cs, CallsiteId::next)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
|
||||
for JsonLoggingLayer<C, W, F>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
@@ -211,7 +240,14 @@ where
|
||||
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
|
||||
if entered.get() {
|
||||
let mut formatter = EventFormatter::new();
|
||||
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
|
||||
formatter.format::<S, F>(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
&self.callsite_ids,
|
||||
&self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
} else {
|
||||
entered.set(true);
|
||||
@@ -219,7 +255,14 @@ where
|
||||
|
||||
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
|
||||
formatter.reset();
|
||||
formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
|
||||
formatter.format::<S, F>(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
&self.callsite_ids,
|
||||
&self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
})
|
||||
}
|
||||
@@ -243,13 +286,17 @@ where
|
||||
|
||||
/// Registers a SpanFields instance as span extension.
|
||||
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
||||
let csid = self.callsite_id(attrs.metadata().callsite());
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let fields = SpanFields::default();
|
||||
fields.record_fields(attrs);
|
||||
// This could deadlock when there's a panic somewhere in the tracing
|
||||
// event handling and a read or write guard is still held. This includes
|
||||
// the OTel subscriber.
|
||||
span.extensions_mut().insert(fields);
|
||||
let mut exts = span.extensions_mut();
|
||||
|
||||
exts.insert(fields);
|
||||
exts.insert(csid);
|
||||
}
|
||||
|
||||
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
|
||||
@@ -265,6 +312,7 @@ where
|
||||
/// wins.
|
||||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
|
||||
if !metadata.is_event() {
|
||||
self.callsite_id(metadata.callsite());
|
||||
// Must not be never because we wouldn't get trace and span data.
|
||||
return Interest::always();
|
||||
}
|
||||
@@ -297,6 +345,26 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Default)]
|
||||
#[repr(transparent)]
|
||||
struct CallsiteId(u32);
|
||||
|
||||
impl CallsiteId {
|
||||
#[inline]
|
||||
fn next() -> Self {
|
||||
// Start at 1 to reserve 0 for default.
|
||||
static COUNTER: AtomicU32 = AtomicU32::new(1);
|
||||
CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for CallsiteId {
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores span field values recorded during the spans lifetime.
|
||||
#[derive(Default)]
|
||||
struct SpanFields {
|
||||
@@ -448,12 +516,14 @@ impl EventFormatter {
|
||||
self.logline_buffer.clear();
|
||||
}
|
||||
|
||||
fn format<S>(
|
||||
fn format<S, const F: usize>(
|
||||
&mut self,
|
||||
now: DateTime<Utc>,
|
||||
event: &Event<'_>,
|
||||
ctx: &Context<'_, S>,
|
||||
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
extract_fields: &IndexSet<&'static str>,
|
||||
) -> io::Result<()>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
@@ -485,6 +555,7 @@ impl EventFormatter {
|
||||
event.record(&mut message_extractor);
|
||||
let mut serializer = message_extractor.into_serializer()?;
|
||||
|
||||
// Direct message fields.
|
||||
let mut fields_present = FieldsPresent(false, skipped_field_indices);
|
||||
event.record(&mut fields_present);
|
||||
if fields_present.0 {
|
||||
@@ -494,7 +565,9 @@ impl EventFormatter {
|
||||
)?;
|
||||
}
|
||||
|
||||
// TODO: thread-local cache?
|
||||
let pid = std::process::id();
|
||||
// Skip adding pid 1 to reduce noise for services running in containers.
|
||||
if pid != 1 {
|
||||
serializer.serialize_entry("process_id", &pid)?;
|
||||
}
|
||||
@@ -514,6 +587,7 @@ impl EventFormatter {
|
||||
|
||||
serializer.serialize_entry("target", meta.target())?;
|
||||
|
||||
// Skip adding module if it's the same as target.
|
||||
if let Some(module) = meta.module_path() {
|
||||
if module != meta.target() {
|
||||
serializer.serialize_entry("module", module)?;
|
||||
@@ -540,7 +614,16 @@ impl EventFormatter {
|
||||
}
|
||||
}
|
||||
|
||||
serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
|
||||
let stack = SerializableSpans {
|
||||
ctx,
|
||||
callsite_ids,
|
||||
fields: ExtractedSpanFields::<'_, F>::new(extract_fields),
|
||||
};
|
||||
serializer.serialize_entry("spans", &stack)?;
|
||||
|
||||
if stack.fields.has_values() {
|
||||
serializer.serialize_entry("extract", &stack.fields)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
};
|
||||
@@ -818,15 +901,20 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the span stack from root to leaf (parent of event) enumerated
|
||||
/// inside an object where the keys are just the number padded with zeroes
|
||||
/// to retain sorting order.
|
||||
// The object is necessary because Loki cannot flatten arrays.
|
||||
struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
|
||||
/// Serializes the span stack from root to leaf (parent of event) as object
|
||||
/// with the span names as keys. To prevent collision we append a numberic value
|
||||
/// to the name. Also, collects any span fields we're interested in. Last one
|
||||
/// wins.
|
||||
struct SerializableSpans<'a, 'ctx, Span, const F: usize>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
ctx: &'a Context<'ctx, Span>,
|
||||
callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
fields: ExtractedSpanFields<'a, F>,
|
||||
}
|
||||
|
||||
impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
@@ -836,9 +924,24 @@ where
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
if let Some(leaf_span) = self.0.lookup_current() {
|
||||
for (i, span) in leaf_span.scope().from_root().enumerate() {
|
||||
serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
|
||||
if let Some(leaf_span) = self.ctx.lookup_current() {
|
||||
for span in leaf_span.scope().from_root() {
|
||||
// Append a numeric callsite ID to the span name to keep the name unique
|
||||
// in the JSON object.
|
||||
let cid = self
|
||||
.callsite_ids
|
||||
.pin()
|
||||
.get(&span.metadata().callsite())
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
|
||||
// Loki turns the # into an underscore during field name concatenation.
|
||||
serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
|
||||
|
||||
serializer.serialize_value(&SerializableSpanFields {
|
||||
span: &span,
|
||||
fields: &self.fields,
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -846,28 +949,79 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes a single span. Include the span ID, name and its fields as
|
||||
/// recorded up to this point.
|
||||
struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>;
|
||||
|
||||
impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
|
||||
/// Serializes the span fields as object.
|
||||
struct SerializableSpanFields<'a, 'span, Span, const F: usize>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
span: &'a SpanRef<'span, Span>,
|
||||
fields: &'a ExtractedSpanFields<'a, F>,
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
Ser: serde::ser::Serializer,
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
// TODO: the span ID is probably only useful for debugging tracing.
|
||||
serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
|
||||
serializer.serialize_entry("span_name", self.0.metadata().name())?;
|
||||
|
||||
let ext = self.0.extensions();
|
||||
let ext = self.span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
for (key, value) in &data.fields.pin() {
|
||||
for (name, value) in &data.fields.pin() {
|
||||
serializer.serialize_entry(name, value)?;
|
||||
// TODO: replace clone with reference, if possible.
|
||||
self.fields.set(name, value.clone());
|
||||
}
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct ExtractedSpanFields<'a, const F: usize> {
|
||||
names: &'a IndexSet<&'static str>,
|
||||
// TODO: replace TryLock with something local thread and interior mutability.
|
||||
// serde API doesn't let us use `mut`.
|
||||
values: TryLock<([Option<serde_json::Value>; F], bool)>,
|
||||
}
|
||||
|
||||
impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
|
||||
fn new(names: &'a IndexSet<&'static str>) -> Self {
|
||||
ExtractedSpanFields {
|
||||
names,
|
||||
values: TryLock::new((array::from_fn(|_| Option::default()), false)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn set(&self, name: &'static str, value: serde_json::Value) {
|
||||
if let Some((index, _)) = self.names.get_full(name) {
|
||||
let mut fields = self.values.try_lock().expect("thread-local use");
|
||||
fields.0[index] = Some(value);
|
||||
fields.1 = true;
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn has_values(&self) -> bool {
|
||||
self.values.try_lock().expect("thread-local use").1
|
||||
}
|
||||
}
|
||||
|
||||
impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
let values = self.values.try_lock().expect("thread-local use");
|
||||
for (i, value) in values.0.iter().enumerate() {
|
||||
if let Some(value) = value {
|
||||
let key = self.names[i];
|
||||
serializer.serialize_entry(key, value)?;
|
||||
}
|
||||
}
|
||||
@@ -879,6 +1033,7 @@ where
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use assert_json_diff::assert_json_eq;
|
||||
@@ -927,14 +1082,17 @@ mod tests {
|
||||
let log_layer = JsonLoggingLayer {
|
||||
clock: clock.clone(),
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
callsite_ids: papaya::HashMap::default(),
|
||||
writer: buffer.clone(),
|
||||
extract_fields: IndexSet::from_iter(["x"]),
|
||||
_marker: PhantomData::<[&'static str; 1]>,
|
||||
};
|
||||
|
||||
let registry = tracing_subscriber::Registry::default().with(log_layer);
|
||||
|
||||
tracing::subscriber::with_default(registry, || {
|
||||
info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
|
||||
info_span!("span2").in_scope(|| {
|
||||
info_span!("some_span", x = 24).in_scope(|| {
|
||||
info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
|
||||
tracing::error!(
|
||||
a = 1,
|
||||
a = 2,
|
||||
@@ -960,16 +1118,16 @@ mod tests {
|
||||
"a": 3,
|
||||
},
|
||||
"spans": {
|
||||
"00":{
|
||||
"span_id": "0000000000000001",
|
||||
"span_name": "span1",
|
||||
"x": 42,
|
||||
"some_span#1":{
|
||||
"x": 24,
|
||||
},
|
||||
"01": {
|
||||
"span_id": "0000000000000002",
|
||||
"span_name": "span2",
|
||||
"some_span#2": {
|
||||
"x": 42,
|
||||
}
|
||||
},
|
||||
"extract": {
|
||||
"x": 42,
|
||||
},
|
||||
"src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
|
||||
"target": "proxy::logging::tests",
|
||||
"process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
|
||||
|
||||
@@ -143,10 +143,6 @@ struct Cli {
|
||||
// Flag to use https for requests to pageserver API.
|
||||
#[arg(long, default_value = "false")]
|
||||
use_https_pageserver_api: bool,
|
||||
|
||||
/// Whether to load safekeeprs from the database and heartbeat them
|
||||
#[arg(long, default_value = "false")]
|
||||
load_safekeepers: bool,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -360,7 +356,6 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
start_as_candidate: args.start_as_candidate,
|
||||
http_service_port: args.listen.port() as i32,
|
||||
use_https_pageserver_api: args.use_https_pageserver_api,
|
||||
load_safekeepers: args.load_safekeepers,
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
|
||||
@@ -394,8 +394,6 @@ pub struct Config {
|
||||
pub long_reconcile_threshold: Duration,
|
||||
|
||||
pub use_https_pageserver_api: bool,
|
||||
|
||||
pub load_safekeepers: bool,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
@@ -1412,20 +1410,15 @@ impl Service {
|
||||
.set(nodes.len() as i64);
|
||||
|
||||
tracing::info!("Loading safekeepers from database...");
|
||||
let safekeepers = if config.load_safekeepers {
|
||||
persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
tracing::info!("Skipping safekeeper loading");
|
||||
Default::default()
|
||||
};
|
||||
|
||||
let safekeepers = persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
|
||||
.collect::<Vec<_>>();
|
||||
let safekeepers: HashMap<NodeId, Safekeeper> =
|
||||
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
|
||||
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
|
||||
|
||||
tracing::info!("Loading shards from database...");
|
||||
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
|
||||
@@ -8066,8 +8059,7 @@ impl Service {
|
||||
) -> Result<(), DatabaseError> {
|
||||
let node_id = NodeId(record.id as u64);
|
||||
self.persistence.safekeeper_upsert(record.clone()).await?;
|
||||
|
||||
if self.config.load_safekeepers {
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut safekeepers = (*locked.safekeepers).clone();
|
||||
match safekeepers.entry(node_id) {
|
||||
@@ -8099,7 +8091,7 @@ impl Service {
|
||||
.await?;
|
||||
let node_id = NodeId(id as u64);
|
||||
// After the change has been persisted successfully, update the in-memory state
|
||||
if self.config.load_safekeepers {
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut safekeepers = (*locked.safekeepers).clone();
|
||||
let sk = safekeepers
|
||||
|
||||
@@ -1133,13 +1133,6 @@ class NeonEnv:
|
||||
if self.storage_controller_config is not None:
|
||||
cfg["storage_controller"] = self.storage_controller_config
|
||||
|
||||
# Disable new storcon flag in compat tests
|
||||
if config.test_may_use_compatibility_snapshot_binaries:
|
||||
if "storage_controller" in cfg:
|
||||
cfg["storage_controller"]["load_safekeepers"] = False
|
||||
else:
|
||||
cfg["storage_controller"] = {"load_safekeepers": False}
|
||||
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
@@ -1169,6 +1162,8 @@ class NeonEnv:
|
||||
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
|
||||
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
|
||||
"no_sync": True,
|
||||
# Look for gaps in WAL received from safekeepeers
|
||||
"validate_wal_contiguity": True,
|
||||
}
|
||||
|
||||
# Batching (https://github.com/neondatabase/neon/issues/9377):
|
||||
@@ -1181,11 +1176,12 @@ class NeonEnv:
|
||||
|
||||
if config.test_may_use_compatibility_snapshot_binaries:
|
||||
log.info(
|
||||
"Skipping WAL contiguity validation to avoid forward-compatibility related test failures"
|
||||
"Skipping prev heatmap settings to avoid forward-compatibility related test failures"
|
||||
)
|
||||
else:
|
||||
# Look for gaps in WAL received from safekeepeers
|
||||
ps_cfg["validate_wal_contiguity"] = True
|
||||
ps_cfg["load_previous_heatmap"] = True
|
||||
ps_cfg["generate_unarchival_heatmap"] = True
|
||||
|
||||
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
|
||||
if get_vectored_concurrent_io is not None:
|
||||
@@ -1201,6 +1197,9 @@ class NeonEnv:
|
||||
config.pageserver_default_tenant_config_compaction_algorithm
|
||||
)
|
||||
|
||||
tenant_config = ps_cfg.setdefault("tenant_config", {})
|
||||
tenant_config["rel_size_v2_enabled"] = True # Enable relsize_v2 by default in tests
|
||||
|
||||
if self.pageserver_remote_storage is not None:
|
||||
ps_cfg["remote_storage"] = remote_storage_to_toml_dict(
|
||||
self.pageserver_remote_storage
|
||||
|
||||
@@ -337,6 +337,8 @@ def allure_add_grafana_link(host: str, timeline_id: TimelineId, start_ms: int, e
|
||||
"""
|
||||
# We expect host to be in format like ep-holy-mouse-w2u462gi.us-east-2.aws.neon.build
|
||||
endpoint_id, region_id, _ = host.split(".", 2)
|
||||
# Remove "-pooler" suffix if present
|
||||
endpoint_id = endpoint_id.removesuffix("-pooler")
|
||||
|
||||
params = {
|
||||
"orgId": 1,
|
||||
|
||||
@@ -137,6 +137,8 @@ def test_remote_extensions(
|
||||
metrics = parse_metrics(raw_metrics)
|
||||
remote_ext_requests = metrics.query_all(
|
||||
"compute_ctl_remote_ext_requests_total",
|
||||
# Check that we properly report the filename in the metrics
|
||||
{"filename": "anon.tar.zst"},
|
||||
)
|
||||
assert len(remote_ext_requests) == 1
|
||||
for sample in remote_ext_requests:
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -118,10 +118,20 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End
|
||||
pageserver.http_client().timeline_gc(shard, env.initial_timeline, None)
|
||||
|
||||
|
||||
def patch_tenant_conf(tenant_conf: dict[str, Any], reldir_type: str) -> dict[str, Any]:
|
||||
tenant_conf = tenant_conf.copy()
|
||||
if reldir_type == "v2":
|
||||
tenant_conf["rel_size_v2_enabled"] = "true"
|
||||
else:
|
||||
tenant_conf["rel_size_v2_enabled"] = "false"
|
||||
return tenant_conf
|
||||
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
#
|
||||
@pytest.mark.timeout(3000) # Contains many sub-tests, is slow in debug builds
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
@pytest.mark.parametrize("reldir_type", ["v1", "v2"])
|
||||
def test_pg_regress(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
@@ -130,6 +140,7 @@ def test_pg_regress(
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
shard_count: int | None,
|
||||
reldir_type: str,
|
||||
):
|
||||
DBNAME = "regression"
|
||||
|
||||
@@ -142,7 +153,7 @@ def test_pg_regress(
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=TENANT_CONF,
|
||||
initial_tenant_conf=patch_tenant_conf(TENANT_CONF, reldir_type),
|
||||
initial_tenant_shard_count=shard_count,
|
||||
)
|
||||
|
||||
@@ -196,6 +207,7 @@ def test_pg_regress(
|
||||
#
|
||||
@pytest.mark.timeout(1500) # Contains many sub-tests, is slow in debug builds
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
@pytest.mark.parametrize("reldir_type", ["v1", "v2"])
|
||||
def test_isolation(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
@@ -204,6 +216,7 @@ def test_isolation(
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
shard_count: int | None,
|
||||
reldir_type: str,
|
||||
):
|
||||
DBNAME = "isolation_regression"
|
||||
|
||||
@@ -211,7 +224,8 @@ def test_isolation(
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=TENANT_CONF, initial_tenant_shard_count=shard_count
|
||||
initial_tenant_conf=patch_tenant_conf(TENANT_CONF, reldir_type),
|
||||
initial_tenant_shard_count=shard_count,
|
||||
)
|
||||
|
||||
# Connect to postgres and create a database called "regression".
|
||||
@@ -267,6 +281,7 @@ def test_isolation(
|
||||
# Run extra Neon-specific pg_regress-based tests. The tests and their
|
||||
# schedule file are in the sql_regress/ directory.
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
@pytest.mark.parametrize("reldir_type", ["v1", "v2"])
|
||||
def test_sql_regress(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
@@ -275,6 +290,7 @@ def test_sql_regress(
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
shard_count: int | None,
|
||||
reldir_type: str,
|
||||
):
|
||||
DBNAME = "regression"
|
||||
|
||||
@@ -282,7 +298,8 @@ def test_sql_regress(
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=TENANT_CONF, initial_tenant_shard_count=shard_count
|
||||
initial_tenant_conf=patch_tenant_conf(TENANT_CONF, reldir_type),
|
||||
initial_tenant_shard_count=shard_count,
|
||||
)
|
||||
|
||||
# Connect to postgres and create a database called "regression".
|
||||
@@ -345,9 +362,7 @@ def test_tx_abort_with_many_relations(
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"rel_size_v2_enabled": "true" if reldir_type == "v2" else "false",
|
||||
}
|
||||
initial_tenant_conf=patch_tenant_conf({}, reldir_type),
|
||||
)
|
||||
ep = env.endpoints.create_start(
|
||||
"main",
|
||||
@@ -375,12 +390,8 @@ def test_tx_abort_with_many_relations(
|
||||
|
||||
# How many relations: this number is tuned to be long enough to take tens of seconds
|
||||
# if the rollback code path is buggy, tripping the test's timeout.
|
||||
if reldir_type == "v1":
|
||||
n = 4000
|
||||
step = 4000
|
||||
else:
|
||||
n = 20000
|
||||
step = 5000
|
||||
n = 5000
|
||||
step = 2500
|
||||
|
||||
def create():
|
||||
# Create many relations
|
||||
|
||||
@@ -327,9 +327,9 @@ def test_check_visibility_map(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
log.info(f"pgbench run {i+1}/{PGBENCH_RUNS}")
|
||||
endpoint.safe_psql(f"create database {dbname}")
|
||||
connstr = endpoint.connstr(dbname=dbname)
|
||||
# pgbench -i will automatically vacuum the tables. This creates the visibility map.
|
||||
pg_bin.run(["pgbench", "-i", "-s", "10", connstr])
|
||||
# Freeze the tuples to set the initial frozen bit.
|
||||
# Initialize the data set, but don't vacuum yet.
|
||||
pg_bin.run(["pgbench", "-i", "-s", "8", "-n", connstr])
|
||||
# Vacuum to create the visibility map, and freeze the tuples to set the frozen bit.
|
||||
endpoint.safe_psql("vacuum freeze", dbname=dbname)
|
||||
# Run pgbench.
|
||||
pg_bin.run(["pgbench", "-c", "32", "-j", "8", "-T", "10", connstr])
|
||||
@@ -354,19 +354,3 @@ def test_check_visibility_map(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (frozen)"
|
||||
|
||||
# Vacuum and freeze the tables, and check that the visibility map is still accurate.
|
||||
for dbname in dbnames:
|
||||
log.info(f"Vacuuming and checking visibility map for {dbname}")
|
||||
with endpoint.cursor(dbname=dbname) as cur:
|
||||
cur.execute("vacuum freeze")
|
||||
|
||||
cur.execute("select count(*) from pg_check_visible('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (visible)"
|
||||
|
||||
cur.execute("select count(*) from pg_check_frozen('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (frozen)"
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 6254ab9b44...b1425505c6
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 9b118b1cff...533be42f7d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 799e7a08dd...78050f965f
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 517b8dc244...780efda2ef
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.4",
|
||||
"517b8dc244abf3e56f0089849e464af76f70b94e"
|
||||
"780efda2ef8d629495cc289624534ba8cde40779"
|
||||
],
|
||||
"v16": [
|
||||
"16.8",
|
||||
"799e7a08dd171aa06a7395dd326f4243aaeb9f93"
|
||||
"78050f965f2e550fd6e58f837394cb3d080d7d42"
|
||||
],
|
||||
"v15": [
|
||||
"15.12",
|
||||
"9b118b1cffa6e4ca0d63389b57b54d11e207e9a8"
|
||||
"533be42f7da97e614ce1c494fafe3e49f53991b1"
|
||||
],
|
||||
"v14": [
|
||||
"14.17",
|
||||
"6254ab9b4496c3e481bc037ae69d859bbc2bdd7d"
|
||||
"b1425505c6f9a622a5aadf3ee362740519993310"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ license.workspace = true
|
||||
ahash = { version = "0.8" }
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
|
||||
base64-647d43efb71741da = { package = "base64", version = "0.21", features = ["alloc"] }
|
||||
base64-647d43efb71741da = { package = "base64", version = "0.21" }
|
||||
base64ct = { version = "1", default-features = false, features = ["std"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
camino = { version = "1", default-features = false, features = ["serde1"] }
|
||||
|
||||
Reference in New Issue
Block a user