mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 19:50:38 +00:00
Compare commits
38 Commits
jcsp/stabi
...
add-pg_tra
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
791f181034 | ||
|
|
486872dd28 | ||
|
|
d37e90f430 | ||
|
|
8eb701d706 | ||
|
|
85a515c176 | ||
|
|
aa88279681 | ||
|
|
b2a670c765 | ||
|
|
ad9655bb01 | ||
|
|
1a87975d95 | ||
|
|
417b2781d9 | ||
|
|
2841f1ffa5 | ||
|
|
aad410c8f1 | ||
|
|
4f94751b75 | ||
|
|
6ee84d985a | ||
|
|
295be03a33 | ||
|
|
8e1b5a9727 | ||
|
|
1ef4258f29 | ||
|
|
65e2aae6e4 | ||
|
|
edc874e1b3 | ||
|
|
181af302b5 | ||
|
|
497116b76d | ||
|
|
a917952b30 | ||
|
|
e581b670f4 | ||
|
|
8ed79ed773 | ||
|
|
381f42519e | ||
|
|
375df517a0 | ||
|
|
9db63fea7a | ||
|
|
bfc767d60d | ||
|
|
109c54a300 | ||
|
|
74920d8cd8 | ||
|
|
131b32ef48 | ||
|
|
581bb5d7d5 | ||
|
|
3c78133477 | ||
|
|
46e046e779 | ||
|
|
d8cee52637 | ||
|
|
2e11d129d0 | ||
|
|
43a7423f72 | ||
|
|
374736a4de |
12
.github/scripts/generate_image_maps.py
vendored
12
.github/scripts/generate_image_maps.py
vendored
@@ -39,12 +39,18 @@ registries = {
|
||||
],
|
||||
}
|
||||
|
||||
release_branches = ["release", "release-proxy", "release-compute"]
|
||||
|
||||
outputs: dict[str, dict[str, list[str]]] = {}
|
||||
|
||||
target_tags = [target_tag, "latest"] if branch == "main" else [target_tag]
|
||||
target_stages = (
|
||||
["dev", "prod"] if branch in ["release", "release-proxy", "release-compute"] else ["dev"]
|
||||
target_tags = (
|
||||
[target_tag, "latest"]
|
||||
if branch == "main"
|
||||
else [target_tag, "released"]
|
||||
if branch in release_branches
|
||||
else [target_tag]
|
||||
)
|
||||
target_stages = ["dev", "prod"] if branch in release_branches else ["dev"]
|
||||
|
||||
for component_name, component_images in components.items():
|
||||
for stage in target_stages:
|
||||
|
||||
12
.github/scripts/push_with_image_map.py
vendored
12
.github/scripts/push_with_image_map.py
vendored
@@ -2,6 +2,9 @@ import json
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
RED = "\033[91m"
|
||||
RESET = "\033[0m"
|
||||
|
||||
image_map = os.getenv("IMAGE_MAP")
|
||||
if not image_map:
|
||||
raise ValueError("IMAGE_MAP environment variable is not set")
|
||||
@@ -29,9 +32,14 @@ while len(pending) > 0:
|
||||
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
if result.returncode != 0:
|
||||
failures.append((" ".join(cmd), result.stdout))
|
||||
failures.append((" ".join(cmd), result.stdout, target))
|
||||
pending.append((source, target))
|
||||
print(
|
||||
f"{RED}[RETRY]{RESET} Push failed for {target}. Retrying... (failure count: {len(failures)})"
|
||||
)
|
||||
print(result.stdout)
|
||||
|
||||
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
|
||||
failed_targets = [target for _, _, target in failures]
|
||||
with open(github_output, "a") as f:
|
||||
f.write("slack_notify=true\n")
|
||||
f.write(f"push_failures={json.dumps(failed_targets)}\n")
|
||||
|
||||
@@ -110,12 +110,19 @@ jobs:
|
||||
IMAGE_MAP: ${{ inputs.image-map }}
|
||||
|
||||
- name: Notify Slack if container image pushing fails
|
||||
if: steps.push.outputs.slack_notify == 'true' || failure()
|
||||
if: steps.push.outputs.push_failures || failure()
|
||||
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
|
||||
with:
|
||||
method: chat.postMessage
|
||||
token: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
payload: |
|
||||
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
|
||||
text: |
|
||||
Pushing container images failed in <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
text: >
|
||||
*Container image pushing ${{
|
||||
steps.push.outcome == 'failure' && 'failed completely' || 'succeeded with some retries'
|
||||
}}* in
|
||||
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
|
||||
${{ steps.push.outputs.push_failures && format(
|
||||
'*Failed targets:*\n• {0}', join(fromJson(steps.push.outputs.push_failures), '\n• ')
|
||||
) || '' }}
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -980,7 +980,7 @@ jobs:
|
||||
TEST_EXTENSIONS_TAG: >-
|
||||
${{
|
||||
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
|
||||
&& 'latest'
|
||||
&& needs.meta.outputs.previous-compute-release
|
||||
|| needs.meta.outputs.build-tag
|
||||
}}
|
||||
TEST_VERSION_ONLY: ${{ matrix.pg_version }}
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -4329,6 +4329,7 @@ dependencies = [
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 1.0.69",
|
||||
"tracing-utils",
|
||||
"utils",
|
||||
]
|
||||
|
||||
@@ -7115,9 +7116,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.43.0"
|
||||
version = "1.43.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
|
||||
checksum = "492a604e2fd7f814268a378409e6c92b5525d747d10db9a229723f55a417958c"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
@@ -7603,6 +7604,7 @@ dependencies = [
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry_sdk",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
|
||||
@@ -183,7 +183,7 @@ test-context = "0.3"
|
||||
thiserror = "1.0"
|
||||
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
|
||||
tokio = { version = "1.41", features = ["macros"] }
|
||||
tokio = { version = "1.43.1", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.12.0"
|
||||
|
||||
@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.85.0
|
||||
ENV RUSTC_VERSION=1.86.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
|
||||
@@ -1022,67 +1022,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_embedding-build"
|
||||
# compile pg_embedding extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_embedding-src
|
||||
ARG PG_VERSION
|
||||
|
||||
# This is our extension, support stopped in favor of pgvector
|
||||
# TODO: deprecate it
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION:?}" in \
|
||||
"v14" | "v15") \
|
||||
export PG_EMBEDDING_VERSION=0.3.5 \
|
||||
export PG_EMBEDDING_CHECKSUM=0e95b27b8b6196e2cf0a0c9ec143fe2219b82e54c5bb4ee064e76398cbe69ae9 \
|
||||
;; \
|
||||
*) \
|
||||
echo "pg_embedding not supported on this PostgreSQL version. Use pgvector instead." && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/pg_embedding/archive/refs/tags/${PG_EMBEDDING_VERSION}.tar.gz -O pg_embedding.tar.gz && \
|
||||
echo "${PG_EMBEDDING_CHECKSUM} pg_embedding.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_embedding-src && cd pg_embedding-src && tar xzf ../pg_embedding.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pg_embedding-build
|
||||
COPY --from=pg_embedding-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/
|
||||
RUN if [ -d pg_embedding-src ]; then \
|
||||
cd pg_embedding-src && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_anon-build"
|
||||
# compile anon extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_anon-src
|
||||
ARG PG_VERSION
|
||||
|
||||
# This is an experimental extension, never got to real production.
|
||||
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION:?}" in "v17") \
|
||||
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
|
||||
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pg_anon-build
|
||||
COPY --from=pg_anon-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src
|
||||
RUN if [ -d pg_anon-src ]; then \
|
||||
cd pg_anon-src && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg build with nonroot user and cargo installed"
|
||||
@@ -1366,8 +1305,8 @@ ARG PG_VERSION
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
WORKDIR /ext-src
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.2.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "5ace028e591f2e000ca10afa5b1ca62203ebff014c2907c0ec3b29c36f28a1bb pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
|
||||
@@ -1449,6 +1388,38 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_partman.control
|
||||
|
||||
#########################################################################################
|
||||
# Layer "pg_tracing"
|
||||
# compile pg_tracing extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_tracing-src
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION:?}" in \
|
||||
"v14" | "v15") \
|
||||
echo "pg_tracing not supported on this PostgreSQL version." && exit 0 \
|
||||
;; \
|
||||
*) \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/DataDog/pg_tracing/archive/refs/tags/v0.1.3.tar.gz -O pg_tracing.tar.gz && \
|
||||
echo "d0a7cca7279bb29601ba6c4c1aaeb3a44d71e6afa3b78aae1e3b7269e688f907 pg_tracing.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_tracing-src && cd pg_tracing-src && tar xzf ../pg_tracing.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pg_tracing-build
|
||||
COPY --from=pg_tracing-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/pg_tracing-src
|
||||
RUN case "${PG_VERSION:?}" in \
|
||||
"v14" | "v15") \
|
||||
echo "pg_tracing not supported on this PostgreSQL version." && exit 0 \
|
||||
;; \
|
||||
*) \
|
||||
;; \
|
||||
esac && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_mooncake"
|
||||
@@ -1675,11 +1646,10 @@ COPY --from=rdkit-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_uuidv7-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_embedding-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_tracing-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
@@ -1853,10 +1823,10 @@ COPY --from=pg_cron-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_uuidv7-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_roaringbitmap-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_semver-src /ext-src/ /ext-src/
|
||||
#COPY --from=pg_embedding-src /ext-src/ /ext-src/
|
||||
#COPY --from=wal2json-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_ivm-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_partman-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_tracing-src /ext-src/ /ext-src/
|
||||
#COPY --from=pg_mooncake-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_repack-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
import 'sql_exporter/lfc_hits.libsonnet',
|
||||
import 'sql_exporter/lfc_misses.libsonnet',
|
||||
import 'sql_exporter/lfc_used.libsonnet',
|
||||
import 'sql_exporter/lfc_used_pages.libsonnet',
|
||||
import 'sql_exporter/lfc_writes.libsonnet',
|
||||
import 'sql_exporter/logical_slot_restart_lsn.libsonnet',
|
||||
import 'sql_exporter/max_cluster_size.libsonnet',
|
||||
|
||||
10
compute/etc/sql_exporter/lfc_used_pages.libsonnet
Normal file
10
compute/etc/sql_exporter/lfc_used_pages.libsonnet
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
metric_name: 'lfc_used_pages',
|
||||
type: 'gauge',
|
||||
help: 'LFC pages used',
|
||||
key_labels: null,
|
||||
values: [
|
||||
'lfc_used_pages',
|
||||
],
|
||||
query: importstr 'sql_exporter/lfc_used_pages.sql',
|
||||
}
|
||||
1
compute/etc/sql_exporter/lfc_used_pages.sql
Normal file
1
compute/etc/sql_exporter/lfc_used_pages.sql
Normal file
@@ -0,0 +1 @@
|
||||
SELECT lfc_value AS lfc_used_pages FROM neon.neon_lfc_stats WHERE lfc_key = 'file_cache_used_pages';
|
||||
@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
|
||||
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
|
||||
ERROR: must be owner of relation constraint_comments_tbl
|
||||
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
|
||||
index 442e7aff2b..525f732b03 100644
|
||||
index d785f92561..16377e5ac9 100644
|
||||
--- a/src/test/regress/expected/conversion.out
|
||||
+++ b/src/test/regress/expected/conversion.out
|
||||
@@ -8,7 +8,7 @@
|
||||
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
|
||||
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
|
||||
AS :'regresslib', 'test_enc_conversion'
|
||||
LANGUAGE C STRICT;
|
||||
@@ -587,16 +587,15 @@ index f551624afb..57f1e432d4 100644
|
||||
SELECT *
|
||||
INTO TABLE ramp
|
||||
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
|
||||
index 454db91ec0..01378d7081 100644
|
||||
index 4cbdbdf84d..573362850e 100644
|
||||
--- a/src/test/regress/expected/database.out
|
||||
+++ b/src/test/regress/expected/database.out
|
||||
@@ -1,8 +1,7 @@
|
||||
@@ -1,8 +1,6 @@
|
||||
CREATE DATABASE regression_tbd
|
||||
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
|
||||
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
|
||||
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
|
||||
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
|
||||
+WARNING: you need to manually restart any running background workers after this command
|
||||
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
|
||||
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
|
||||
BEGIN;
|
||||
@@ -700,7 +699,7 @@ index 6ed50fdcfa..caa00a345d 100644
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
|
||||
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
|
||||
index 6b8c2f2414..8e13b7fa46 100644
|
||||
index 84745b9f60..4883c12351 100644
|
||||
--- a/src/test/regress/expected/foreign_key.out
|
||||
+++ b/src/test/regress/expected/foreign_key.out
|
||||
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
@@ -1112,7 +1111,7 @@ index 8475231735..0653946337 100644
|
||||
DROP ROLE regress_passwd_sha_len1;
|
||||
DROP ROLE regress_passwd_sha_len2;
|
||||
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
|
||||
index 5b9dba7b32..cc408dad42 100644
|
||||
index 620fbe8c52..0570102357 100644
|
||||
--- a/src/test/regress/expected/privileges.out
|
||||
+++ b/src/test/regress/expected/privileges.out
|
||||
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
|
||||
@@ -1174,8 +1173,8 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
SET SESSION AUTHORIZATION regress_priv_user3;
|
||||
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
ERROR: permission denied to grant privileges as role "regress_priv_role"
|
||||
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
|
||||
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
|
||||
@@ -1192,7 +1191,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
DROP ROLE regress_priv_role;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
SELECT session_user, current_user;
|
||||
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -1201,7 +1200,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
-- A dummy index function checking current_user
|
||||
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
|
||||
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
drop cascades to function testns.priv_testproc(integer)
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
@@ -1212,7 +1211,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
|
||||
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
|
||||
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
|
||||
DROP USER regress_priv_user8; -- does not exist
|
||||
ERROR: role "regress_priv_user8" does not exist
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -1221,7 +1220,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
-- LOCK TABLE and SELECT permission
|
||||
GRANT SELECT ON lock_table TO regress_locktable_user;
|
||||
@@ -2874,7 +2878,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -2881,7 +2885,7 @@ DROP USER regress_locktable_user;
|
||||
-- pg_backend_memory_contexts.
|
||||
-- switch to superuser
|
||||
\c -
|
||||
@@ -1230,7 +1229,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
has_table_privilege
|
||||
---------------------
|
||||
@@ -2918,10 +2922,10 @@ RESET ROLE;
|
||||
@@ -2925,10 +2929,10 @@ RESET ROLE;
|
||||
-- clean up
|
||||
DROP ROLE regress_readallstats;
|
||||
-- test role grantor machinery
|
||||
@@ -1245,7 +1244,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
SET SESSION AUTHORIZATION regress_group_direct_manager;
|
||||
@@ -2950,9 +2954,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
@@ -2957,9 +2961,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
@@ -1841,7 +1840,7 @@ index 09a255649b..15895f0c53 100644
|
||||
CREATE TABLE ruletest_t2 (x int);
|
||||
CREATE VIEW ruletest_v1 WITH (security_invoker=true) AS
|
||||
diff --git a/src/test/regress/expected/security_label.out b/src/test/regress/expected/security_label.out
|
||||
index a8e01a6220..5a9cef4ede 100644
|
||||
index a8e01a6220..83543b250a 100644
|
||||
--- a/src/test/regress/expected/security_label.out
|
||||
+++ b/src/test/regress/expected/security_label.out
|
||||
@@ -6,8 +6,8 @@ SET client_min_messages TO 'warning';
|
||||
@@ -1855,34 +1854,6 @@ index a8e01a6220..5a9cef4ede 100644
|
||||
CREATE TABLE seclabel_tbl1 (a int, b text);
|
||||
CREATE TABLE seclabel_tbl2 (x int, y text);
|
||||
CREATE VIEW seclabel_view1 AS SELECT * FROM seclabel_tbl2;
|
||||
@@ -19,21 +19,21 @@ ALTER TABLE seclabel_tbl2 OWNER TO regress_seclabel_user2;
|
||||
-- Test of SECURITY LABEL statement without a plugin
|
||||
--
|
||||
SECURITY LABEL ON TABLE seclabel_tbl1 IS 'classified'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL FOR 'dummy' ON TABLE seclabel_tbl1 IS 'classified'; -- fail
|
||||
ERROR: security label provider "dummy" is not loaded
|
||||
SECURITY LABEL ON TABLE seclabel_tbl1 IS '...invalid label...'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL ON TABLE seclabel_tbl3 IS 'unclassified'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL FOR 'dummy' ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
|
||||
ERROR: security label provider "dummy" is not loaded
|
||||
SECURITY LABEL ON ROLE regress_seclabel_user1 IS '...invalid label...'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
SECURITY LABEL ON ROLE regress_seclabel_user3 IS 'unclassified'; -- fail
|
||||
-ERROR: no security label providers have been loaded
|
||||
+ERROR: must specify provider when multiple security label providers have been loaded
|
||||
-- clean up objects
|
||||
DROP FUNCTION seclabel_four();
|
||||
DROP DOMAIN seclabel_domain;
|
||||
diff --git a/src/test/regress/expected/select_into.out b/src/test/regress/expected/select_into.out
|
||||
index b79fe9a1c0..e29fab88ab 100644
|
||||
--- a/src/test/regress/expected/select_into.out
|
||||
@@ -2413,10 +2384,10 @@ index e3e3bea709..fa86ddc326 100644
|
||||
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
|
||||
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
|
||||
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
|
||||
index 9a65fca91f..58431a3056 100644
|
||||
index b567a1a572..4d1ac2e631 100644
|
||||
--- a/src/test/regress/sql/conversion.sql
|
||||
+++ b/src/test/regress/sql/conversion.sql
|
||||
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
|
||||
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
|
||||
AS :'regresslib', 'test_enc_conversion'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
@@ -2780,7 +2751,7 @@ index ae6841308b..47bc792e30 100644
|
||||
|
||||
SELECT *
|
||||
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
|
||||
index 0367c0e37a..a23b98c4bd 100644
|
||||
index 46ad263478..eb05584ed5 100644
|
||||
--- a/src/test/regress/sql/database.sql
|
||||
+++ b/src/test/regress/sql/database.sql
|
||||
@@ -1,8 +1,6 @@
|
||||
@@ -2893,7 +2864,7 @@ index aa147b14a9..370e0dd570 100644
|
||||
CREATE FOREIGN DATA WRAPPER dummy;
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
|
||||
index 45c7a534cb..32dd26b8cd 100644
|
||||
index 9f4210b26e..620d3fc87e 100644
|
||||
--- a/src/test/regress/sql/foreign_key.sql
|
||||
+++ b/src/test/regress/sql/foreign_key.sql
|
||||
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
@@ -3246,7 +3217,7 @@ index 53e86b0b6c..0303fdfe96 100644
|
||||
-- Check that the invalid secrets were re-hashed. A re-hashed secret
|
||||
-- should not contain the original salt.
|
||||
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
|
||||
index 249df17a58..b258e7f26a 100644
|
||||
index 259f1aedd1..6e1a3d17b7 100644
|
||||
--- a/src/test/regress/sql/privileges.sql
|
||||
+++ b/src/test/regress/sql/privileges.sql
|
||||
@@ -24,18 +24,18 @@ RESET client_min_messages;
|
||||
@@ -3308,7 +3279,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
|
||||
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -3317,7 +3288,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
|
||||
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
|
||||
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
|
||||
@@ -3328,7 +3299,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
|
||||
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -3337,7 +3308,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
|
||||
-- LOCK TABLE and SELECT permission
|
||||
@@ -1836,7 +1836,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -1839,7 +1839,7 @@ DROP USER regress_locktable_user;
|
||||
-- switch to superuser
|
||||
\c -
|
||||
|
||||
@@ -3346,7 +3317,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
|
||||
@@ -1856,10 +1856,10 @@ RESET ROLE;
|
||||
@@ -1859,10 +1859,10 @@ RESET ROLE;
|
||||
DROP ROLE regress_readallstats;
|
||||
|
||||
-- test role grantor machinery
|
||||
@@ -3361,7 +3332,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
@@ -1881,9 +1881,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
@@ -1884,9 +1884,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
|
||||
@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
|
||||
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
|
||||
ERROR: must be owner of relation constraint_comments_tbl
|
||||
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
|
||||
index 442e7aff2b..525f732b03 100644
|
||||
index d785f92561..16377e5ac9 100644
|
||||
--- a/src/test/regress/expected/conversion.out
|
||||
+++ b/src/test/regress/expected/conversion.out
|
||||
@@ -8,7 +8,7 @@
|
||||
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
|
||||
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
|
||||
AS :'regresslib', 'test_enc_conversion'
|
||||
LANGUAGE C STRICT;
|
||||
@@ -587,16 +587,15 @@ index f551624afb..57f1e432d4 100644
|
||||
SELECT *
|
||||
INTO TABLE ramp
|
||||
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
|
||||
index 454db91ec0..01378d7081 100644
|
||||
index 4cbdbdf84d..573362850e 100644
|
||||
--- a/src/test/regress/expected/database.out
|
||||
+++ b/src/test/regress/expected/database.out
|
||||
@@ -1,8 +1,7 @@
|
||||
@@ -1,8 +1,6 @@
|
||||
CREATE DATABASE regression_tbd
|
||||
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
|
||||
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
|
||||
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
|
||||
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
|
||||
+WARNING: you need to manually restart any running background workers after this command
|
||||
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
|
||||
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
|
||||
BEGIN;
|
||||
@@ -700,7 +699,7 @@ index 6ed50fdcfa..caa00a345d 100644
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
|
||||
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
|
||||
index 69994c98e3..129abcfbe8 100644
|
||||
index fe6a1015f2..614b387b7d 100644
|
||||
--- a/src/test/regress/expected/foreign_key.out
|
||||
+++ b/src/test/regress/expected/foreign_key.out
|
||||
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
@@ -1147,7 +1146,7 @@ index 924d6e001d..7fdda73439 100644
|
||||
DROP ROLE regress_passwd_sha_len1;
|
||||
DROP ROLE regress_passwd_sha_len2;
|
||||
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
|
||||
index 1296da0d57..f43fffa44c 100644
|
||||
index e8c668e0a1..03be5c2120 100644
|
||||
--- a/src/test/regress/expected/privileges.out
|
||||
+++ b/src/test/regress/expected/privileges.out
|
||||
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
|
||||
@@ -1209,8 +1208,8 @@ index 1296da0d57..f43fffa44c 100644
|
||||
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
SET SESSION AUTHORIZATION regress_priv_user3;
|
||||
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
ERROR: permission denied to grant privileges as role "regress_priv_role"
|
||||
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
|
||||
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
|
||||
@@ -1227,7 +1226,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
DROP ROLE regress_priv_role;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
SELECT session_user, current_user;
|
||||
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -1236,7 +1235,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
-- A dummy index function checking current_user
|
||||
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
|
||||
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
drop cascades to function testns.priv_testproc(integer)
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
@@ -1247,7 +1246,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
|
||||
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
|
||||
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
|
||||
DROP USER regress_priv_user8; -- does not exist
|
||||
ERROR: role "regress_priv_user8" does not exist
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -1256,7 +1255,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
-- LOCK TABLE and SELECT permission
|
||||
GRANT SELECT ON lock_table TO regress_locktable_user;
|
||||
@@ -2888,7 +2892,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -2895,7 +2899,7 @@ DROP USER regress_locktable_user;
|
||||
-- pg_backend_memory_contexts.
|
||||
-- switch to superuser
|
||||
\c -
|
||||
@@ -1265,7 +1264,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
has_table_privilege
|
||||
---------------------
|
||||
@@ -2932,10 +2936,10 @@ RESET ROLE;
|
||||
@@ -2939,10 +2943,10 @@ RESET ROLE;
|
||||
-- clean up
|
||||
DROP ROLE regress_readallstats;
|
||||
-- test role grantor machinery
|
||||
@@ -1280,7 +1279,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
SET SESSION AUTHORIZATION regress_group_direct_manager;
|
||||
@@ -2964,9 +2968,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
@@ -2971,9 +2975,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
@@ -1293,7 +1292,7 @@ index 1296da0d57..f43fffa44c 100644
|
||||
CREATE SCHEMA regress_roleoption;
|
||||
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
|
||||
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
|
||||
@@ -2995,9 +2999,9 @@ DROP ROLE regress_roleoption_protagonist;
|
||||
@@ -3002,9 +3006,9 @@ DROP ROLE regress_roleoption_protagonist;
|
||||
DROP ROLE regress_roleoption_donor;
|
||||
DROP ROLE regress_roleoption_recipient;
|
||||
-- MAINTAIN
|
||||
@@ -2433,10 +2432,10 @@ index e3e3bea709..fa86ddc326 100644
|
||||
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
|
||||
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
|
||||
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
|
||||
index 9a65fca91f..58431a3056 100644
|
||||
index b567a1a572..4d1ac2e631 100644
|
||||
--- a/src/test/regress/sql/conversion.sql
|
||||
+++ b/src/test/regress/sql/conversion.sql
|
||||
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
|
||||
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
|
||||
AS :'regresslib', 'test_enc_conversion'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
@@ -2800,7 +2799,7 @@ index ae6841308b..47bc792e30 100644
|
||||
|
||||
SELECT *
|
||||
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
|
||||
index 0367c0e37a..a23b98c4bd 100644
|
||||
index 46ad263478..eb05584ed5 100644
|
||||
--- a/src/test/regress/sql/database.sql
|
||||
+++ b/src/test/regress/sql/database.sql
|
||||
@@ -1,8 +1,6 @@
|
||||
@@ -2913,7 +2912,7 @@ index aa147b14a9..370e0dd570 100644
|
||||
CREATE FOREIGN DATA WRAPPER dummy;
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
|
||||
index 2e710e419c..89cd481a54 100644
|
||||
index 8c4e4c7c83..e946cd2119 100644
|
||||
--- a/src/test/regress/sql/foreign_key.sql
|
||||
+++ b/src/test/regress/sql/foreign_key.sql
|
||||
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
@@ -3301,7 +3300,7 @@ index bb82aa4aa2..dd8a05e24d 100644
|
||||
-- Check that the invalid secrets were re-hashed. A re-hashed secret
|
||||
-- should not contain the original salt.
|
||||
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
|
||||
index 5880bc018d..27aa952b18 100644
|
||||
index b7e1cb6cdd..6e5a2217f1 100644
|
||||
--- a/src/test/regress/sql/privileges.sql
|
||||
+++ b/src/test/regress/sql/privileges.sql
|
||||
@@ -24,18 +24,18 @@ RESET client_min_messages;
|
||||
@@ -3363,7 +3362,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
|
||||
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -3372,7 +3371,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
|
||||
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
|
||||
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
|
||||
@@ -3383,7 +3382,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
|
||||
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -3392,7 +3391,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
|
||||
-- LOCK TABLE and SELECT permission
|
||||
@@ -1851,7 +1851,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -1854,7 +1854,7 @@ DROP USER regress_locktable_user;
|
||||
-- switch to superuser
|
||||
\c -
|
||||
|
||||
@@ -3401,7 +3400,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
|
||||
@@ -1871,10 +1871,10 @@ RESET ROLE;
|
||||
@@ -1874,10 +1874,10 @@ RESET ROLE;
|
||||
DROP ROLE regress_readallstats;
|
||||
|
||||
-- test role grantor machinery
|
||||
@@ -3416,7 +3415,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
@@ -1896,9 +1896,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
@@ -1899,9 +1899,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
@@ -3429,7 +3428,7 @@ index 5880bc018d..27aa952b18 100644
|
||||
CREATE SCHEMA regress_roleoption;
|
||||
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
|
||||
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
|
||||
@@ -1926,9 +1926,9 @@ DROP ROLE regress_roleoption_donor;
|
||||
@@ -1929,9 +1929,9 @@ DROP ROLE regress_roleoption_donor;
|
||||
DROP ROLE regress_roleoption_recipient;
|
||||
|
||||
-- MAINTAIN
|
||||
|
||||
@@ -2,23 +2,6 @@ diff --git a/expected/ut-A.out b/expected/ut-A.out
|
||||
index da723b8..5328114 100644
|
||||
--- a/expected/ut-A.out
|
||||
+++ b/expected/ut-A.out
|
||||
@@ -9,13 +9,16 @@ SET search_path TO public;
|
||||
----
|
||||
-- No.A-1-1-3
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
-- No.A-1-2-3
|
||||
DROP EXTENSION pg_hint_plan;
|
||||
-- No.A-1-1-4
|
||||
CREATE SCHEMA other_schema;
|
||||
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
DROP SCHEMA other_schema;
|
||||
----
|
||||
---- No. A-5-1 comment pattern
|
||||
@@ -3175,6 +3178,7 @@ SELECT s.query, s.calls
|
||||
FROM public.pg_stat_statements s
|
||||
JOIN pg_catalog.pg_database d
|
||||
@@ -27,18 +10,6 @@ index da723b8..5328114 100644
|
||||
ORDER BY 1;
|
||||
query | calls
|
||||
--------------------------------------+-------
|
||||
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
|
||||
index d372459..6282afe 100644
|
||||
--- a/expected/ut-fdw.out
|
||||
+++ b/expected/ut-fdw.out
|
||||
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
|
||||
SET client_min_messages TO LOG;
|
||||
SET pg_hint_plan.enable_hint TO on;
|
||||
CREATE EXTENSION file_fdw;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
|
||||
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
|
||||
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
|
||||
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
|
||||
diff --git a/sql/ut-A.sql b/sql/ut-A.sql
|
||||
index 7c7d58a..4fd1a07 100644
|
||||
--- a/sql/ut-A.sql
|
||||
|
||||
@@ -1,24 +1,3 @@
|
||||
diff --git a/expected/ut-A.out b/expected/ut-A.out
|
||||
index e7d68a1..65a056c 100644
|
||||
--- a/expected/ut-A.out
|
||||
+++ b/expected/ut-A.out
|
||||
@@ -9,13 +9,16 @@ SET search_path TO public;
|
||||
----
|
||||
-- No.A-1-1-3
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
-- No.A-1-2-3
|
||||
DROP EXTENSION pg_hint_plan;
|
||||
-- No.A-1-1-4
|
||||
CREATE SCHEMA other_schema;
|
||||
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
DROP SCHEMA other_schema;
|
||||
----
|
||||
---- No. A-5-1 comment pattern
|
||||
diff --git a/expected/ut-J.out b/expected/ut-J.out
|
||||
index 2fa3c70..314e929 100644
|
||||
--- a/expected/ut-J.out
|
||||
@@ -160,15 +139,3 @@ index a09bd34..0ad227c 100644
|
||||
error hint:
|
||||
|
||||
explain_filter
|
||||
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
|
||||
index 017fa4b..98d989b 100644
|
||||
--- a/expected/ut-fdw.out
|
||||
+++ b/expected/ut-fdw.out
|
||||
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
|
||||
SET client_min_messages TO LOG;
|
||||
SET pg_hint_plan.enable_hint TO on;
|
||||
CREATE EXTENSION file_fdw;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
|
||||
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
|
||||
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
|
||||
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
|
||||
|
||||
@@ -419,7 +419,7 @@ impl ComputeNode {
|
||||
.iter()
|
||||
.filter_map(|val| val.parse::<usize>().ok())
|
||||
.map(|val| if val > 1 { val - 1 } else { 1 })
|
||||
.last()
|
||||
.next_back()
|
||||
.unwrap_or(3)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -545,6 +545,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_compaction_ratio_percent' as integer")?,
|
||||
sampling_ratio: settings
|
||||
.remove("sampling_ratio")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Falied to parse 'sampling_ratio'")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -385,8 +385,6 @@ where
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let ssl_ca_certs = match &cli.ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
@@ -401,9 +399,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let storcon_client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let mut trimmed = cli.api.to_string();
|
||||
trimmed.pop();
|
||||
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
|
||||
let vps_client = mgmt_api::Client::new(http_client.clone(), trimmed, cli.jwt.as_deref());
|
||||
|
||||
match cli.command {
|
||||
Command::NodeRegister {
|
||||
@@ -1056,7 +1056,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
|
||||
let mut stream = futures::stream::iter(moves)
|
||||
.map(|mv| {
|
||||
let client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
let client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
|
||||
async move {
|
||||
client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
|
||||
@@ -91,14 +91,14 @@ impl Server {
|
||||
Ok(tls_stream) => tls_stream,
|
||||
Err(err) => {
|
||||
if !suppress_io_error(&err) {
|
||||
info!("Failed to accept TLS connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!("Failed to serve HTTPS connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,7 +106,7 @@ impl Server {
|
||||
// Handle HTTP connection.
|
||||
if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!("Failed to serve HTTP connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ postgres_backend.workspace = true
|
||||
nix = {workspace = true, optional = true}
|
||||
reqwest.workspace = true
|
||||
rand.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
bincode.workspace = true
|
||||
|
||||
@@ -51,9 +51,54 @@ pub struct NodeMetadata {
|
||||
/// If there cannot be a static default value because we need to make runtime
|
||||
/// checks to determine the default, make it an `Option` (which defaults to None).
|
||||
/// The runtime check should be done in the consuming crate, i.e., `pageserver`.
|
||||
///
|
||||
/// Unknown fields are silently ignored during deserialization.
|
||||
/// The alternative, which we used in the past, was to set `deny_unknown_fields`,
|
||||
/// which fails deserialization, and hence pageserver startup, if there is an unknown field.
|
||||
/// The reason we don't do that anymore is that it complicates
|
||||
/// usage of config fields for feature flagging, which we commonly do for
|
||||
/// region-by-region rollouts.
|
||||
/// The complications mainly arise because the `pageserver.toml` contents on a
|
||||
/// prod server have a separate lifecycle from the pageserver binary.
|
||||
/// For instance, `pageserver.toml` contents today are defined in the internal
|
||||
/// infra repo, and thus introducing a new config field to pageserver and
|
||||
/// rolling it out to prod servers are separate commits in separate repos
|
||||
/// that can't be made or rolled back atomically.
|
||||
/// Rollbacks in particular pose a risk with deny_unknown_fields because
|
||||
/// the old pageserver binary may reject a new config field, resulting in
|
||||
/// an outage unless the person doing the pageserver rollback remembers
|
||||
/// to also revert the commit that added the config field in to the
|
||||
/// `pageserver.toml` templates in the internal infra repo.
|
||||
/// (A pre-deploy config check would eliminate this risk during rollbacks,
|
||||
/// cf [here](https://github.com/neondatabase/cloud/issues/24349).)
|
||||
/// In addition to this compatibility problem during emergency rollbacks,
|
||||
/// deny_unknown_fields adds further complications when decomissioning a feature
|
||||
/// flag: with deny_unknown_fields, we can't remove a flag from the [`ConfigToml`]
|
||||
/// until all prod servers' `pageserver.toml` files have been updated to a version
|
||||
/// that doesn't specify the flag. Otherwise new software would fail to start up.
|
||||
/// This adds the requirement for an intermediate step where the new config field
|
||||
/// is accepted but ignored, prolonging the decomissioning process by an entire
|
||||
/// release cycle.
|
||||
/// By contrast with unknown fields silently ignored, decomissioning a feature
|
||||
/// flag is a one-step process: we can skip the intermediate step and straight
|
||||
/// remove the field from the [`ConfigToml`]. We leave the field in the
|
||||
/// `pageserver.toml` files on prod servers until we reach certainty that we
|
||||
/// will not roll back to old software whose behavior was dependent on config.
|
||||
/// Then we can remove the field from the templates in the internal infra repo.
|
||||
/// This process is [documented internally](
|
||||
/// https://docs.neon.build/storage/pageserver_configuration.html).
|
||||
///
|
||||
/// Note that above relaxed compatbility for the config format does NOT APPLY
|
||||
/// TO THE STORAGE FORMAT. As general guidance, when introducing storage format
|
||||
/// changes, ensure that the potential rollback target version will be compatible
|
||||
/// with the new format. This must hold regardless of what flags are set in in the `pageserver.toml`:
|
||||
/// any format version that exists in an environment must be compatible with the software that runs there.
|
||||
/// Use a pageserver.toml flag only to gate whether software _writes_ the new format.
|
||||
/// For more compatibility considerations, refer to [internal docs](
|
||||
/// https://docs.neon.build/storage/compat.html?highlight=compat#format-versions--compatibility)
|
||||
#[serde_as]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
#[serde(default)]
|
||||
pub struct ConfigToml {
|
||||
// types mapped 1:1 into the runtime PageServerConfig type
|
||||
pub listen_pg_addr: String,
|
||||
@@ -134,10 +179,10 @@ pub struct ConfigToml {
|
||||
pub load_previous_heatmap: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generate_unarchival_heatmap: Option<bool>,
|
||||
pub tracing: Option<Tracing>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct DiskUsageEvictionTaskConfig {
|
||||
pub max_usage_pct: utils::serde_percent::Percent,
|
||||
pub min_avail_bytes: u64,
|
||||
@@ -152,13 +197,11 @@ pub struct DiskUsageEvictionTaskConfig {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub enum PageServicePipeliningConfig {
|
||||
Serial,
|
||||
Pipelined(PageServicePipeliningConfigPipelined),
|
||||
}
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct PageServicePipeliningConfigPipelined {
|
||||
/// Causes runtime errors if larger than max get_vectored batch size.
|
||||
pub max_batch_size: NonZeroUsize,
|
||||
@@ -174,7 +217,6 @@ pub enum PageServiceProtocolPipelinedExecutionStrategy {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub enum GetVectoredConcurrentIo {
|
||||
/// The read path is fully sequential: layers are visited
|
||||
/// one after the other and IOs are issued and waited upon
|
||||
@@ -191,6 +233,54 @@ pub enum GetVectoredConcurrentIo {
|
||||
SidecarTask,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Ratio {
|
||||
pub numerator: usize,
|
||||
pub denominator: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct OtelExporterConfig {
|
||||
pub endpoint: String,
|
||||
pub protocol: OtelExporterProtocol,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum OtelExporterProtocol {
|
||||
Grpc,
|
||||
HttpBinary,
|
||||
HttpJson,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Tracing {
|
||||
pub sampling_ratio: Ratio,
|
||||
pub export_config: OtelExporterConfig,
|
||||
}
|
||||
|
||||
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
|
||||
fn from(val: &OtelExporterConfig) -> Self {
|
||||
tracing_utils::ExportConfig {
|
||||
endpoint: Some(val.endpoint.clone()),
|
||||
protocol: val.protocol.into(),
|
||||
timeout: val.timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
|
||||
fn from(val: OtelExporterProtocol) -> Self {
|
||||
match val {
|
||||
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
|
||||
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
|
||||
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -245,7 +335,7 @@ pub struct MaxVectoredReadBytes(pub NonZeroUsize);
|
||||
|
||||
/// Tenant-level configuration values, used for various purposes.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields, default)]
|
||||
#[serde(default)]
|
||||
pub struct TenantConfigToml {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
@@ -367,6 +457,9 @@ pub struct TenantConfigToml {
|
||||
/// The ratio that triggers the auto gc-compaction. If (the total size of layers between L2 LSN and gc-horizon) / (size below the L2 LSN)
|
||||
/// is above this ratio, gc-compaction will be triggered.
|
||||
pub gc_compaction_ratio_percent: u64,
|
||||
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
|
||||
/// that will get perf sampling for the tenant.
|
||||
pub sampling_ratio: Option<Ratio>,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -537,6 +630,7 @@ impl Default for ConfigToml {
|
||||
validate_wal_contiguity: None,
|
||||
load_previous_heatmap: None,
|
||||
generate_unarchival_heatmap: None,
|
||||
tracing: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -652,6 +746,7 @@ impl Default for TenantConfigToml {
|
||||
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
|
||||
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
|
||||
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
|
||||
sampling_ratio: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use utils::lsn::Lsn;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::{completion, serde_system_time};
|
||||
|
||||
use crate::config::Ratio;
|
||||
use crate::key::{CompactKey, Key};
|
||||
use crate::reltag::RelTag;
|
||||
use crate::shard::{ShardCount, ShardStripeSize, TenantShardId};
|
||||
@@ -568,6 +569,8 @@ pub struct TenantConfigPatch {
|
||||
pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_compaction_ratio_percent: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub sampling_ratio: FieldPatch<Option<Ratio>>,
|
||||
}
|
||||
|
||||
/// Like [`crate::config::TenantConfigToml`], but preserves the information
|
||||
@@ -688,6 +691,9 @@ pub struct TenantConfig {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_compaction_ratio_percent: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub sampling_ratio: Option<Option<Ratio>>,
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
@@ -730,6 +736,7 @@ impl TenantConfig {
|
||||
mut gc_compaction_enabled,
|
||||
mut gc_compaction_initial_threshold_kb,
|
||||
mut gc_compaction_ratio_percent,
|
||||
mut sampling_ratio,
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
@@ -824,6 +831,7 @@ impl TenantConfig {
|
||||
patch
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
patch.sampling_ratio.apply(&mut sampling_ratio);
|
||||
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
@@ -860,6 +868,7 @@ impl TenantConfig {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
sampling_ratio,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -961,6 +970,7 @@ impl TenantConfig {
|
||||
gc_compaction_ratio_percent: self
|
||||
.gc_compaction_ratio_percent
|
||||
.unwrap_or(global_conf.gc_compaction_ratio_percent),
|
||||
sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1094,7 +1104,7 @@ pub struct CompactionAlgorithmSettings {
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
pub enum L0FlushConfig {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
Direct { max_concurrency: NonZeroUsize },
|
||||
|
||||
@@ -558,7 +558,7 @@ async fn upload_large_enough_file(
|
||||
) -> usize {
|
||||
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
let body = bytes::Bytes::from(vec![0u8; 1024]);
|
||||
let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
|
||||
let contents = std::iter::once(header).chain(std::iter::repeat_n(body, 128));
|
||||
|
||||
let len = contents.clone().fold(0, |acc, next| acc + next.len());
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ pub struct PeerInfo {
|
||||
pub ts: Instant,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
pub https_connstr: Option<String>,
|
||||
}
|
||||
|
||||
pub type FullTransactionId = u64;
|
||||
@@ -227,6 +228,8 @@ pub struct TimelineDeleteResult {
|
||||
pub dir_existed: bool,
|
||||
}
|
||||
|
||||
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
|
||||
|
||||
fn lsn_invalid() -> Lsn {
|
||||
Lsn::INVALID
|
||||
}
|
||||
@@ -259,6 +262,8 @@ pub struct SkTimelineInfo {
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub http_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub https_connstr: Option<String>,
|
||||
// Minimum of all active RO replicas flush LSN
|
||||
#[serde(default = "lsn_invalid")]
|
||||
pub standby_horizon: Lsn,
|
||||
|
||||
@@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber.workspace = true # For examples in docs
|
||||
|
||||
@@ -31,10 +31,10 @@
|
||||
//! .init();
|
||||
//! }
|
||||
//! ```
|
||||
#![deny(unsafe_code)]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
pub mod http;
|
||||
pub mod perf_span;
|
||||
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
|
||||
144
libs/tracing-utils/src/perf_span.rs
Normal file
144
libs/tracing-utils/src/perf_span.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
//! Crutch module to work around tracing infrastructure deficiencies
|
||||
//!
|
||||
//! We wish to collect granular request spans without impacting performance
|
||||
//! by much. Ideally, we should have zero overhead for a sampling rate of 0.
|
||||
//!
|
||||
//! The approach taken by the pageserver crate is to use a completely different
|
||||
//! span hierarchy for the performance spans. Spans are explicitly stored in
|
||||
//! the request context and use a different [`tracing::Subscriber`] in order
|
||||
//! to avoid expensive filtering.
|
||||
//!
|
||||
//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly,
|
||||
//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span,
|
||||
//! the global default [`tracing::Dispatch`] is used. This is problematic if one
|
||||
//! wishes to juggle different subscribers.
|
||||
//!
|
||||
//! In order to work around this, this module provides a [`PerfSpan`] type which
|
||||
//! wraps a [`Span`] and sets the default subscriber when exiting the span. This
|
||||
//! achieves the correct routing.
|
||||
//!
|
||||
//! There's also a modified version of [`tracing::Instrument`] which works with
|
||||
//! [`PerfSpan`].
|
||||
|
||||
use core::{
|
||||
future::Future,
|
||||
marker::Sized,
|
||||
mem::ManuallyDrop,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{Dispatch, span::Span};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PerfSpan {
|
||||
inner: ManuallyDrop<Span>,
|
||||
dispatch: Dispatch,
|
||||
}
|
||||
|
||||
#[must_use = "once a span has been entered, it should be exited"]
|
||||
pub struct PerfSpanEntered<'a> {
|
||||
span: &'a PerfSpan,
|
||||
}
|
||||
|
||||
impl PerfSpan {
|
||||
pub fn new(span: Span, dispatch: Dispatch) -> Self {
|
||||
Self {
|
||||
inner: ManuallyDrop::new(span),
|
||||
dispatch,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enter(&self) -> PerfSpanEntered {
|
||||
if let Some(ref id) = self.inner.id() {
|
||||
self.dispatch.enter(id);
|
||||
}
|
||||
|
||||
PerfSpanEntered { span: self }
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &Span {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpan {
|
||||
fn drop(&mut self) {
|
||||
// Bring the desired dispatch into scope before explicitly calling
|
||||
// the span destructor. This routes the span exit to the correct
|
||||
// [`tracing::Subscriber`].
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch);
|
||||
// SAFETY: ManuallyDrop in Drop implementation
|
||||
unsafe { ManuallyDrop::drop(&mut self.inner) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpanEntered<'_> {
|
||||
fn drop(&mut self) {
|
||||
assert!(self.span.inner.id().is_some());
|
||||
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch);
|
||||
self.span.dispatch.exit(&self.span.inner.id().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PerfInstrument: Sized {
|
||||
fn instrument(self, span: PerfSpan) -> PerfInstrumented<Self> {
|
||||
PerfInstrumented {
|
||||
inner: ManuallyDrop::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
#[project = PerfInstrumentedProj]
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct PerfInstrumented<T> {
|
||||
// `ManuallyDrop` is used here to to enter instrument `Drop` by entering
|
||||
// `Span` and executing `ManuallyDrop::drop`.
|
||||
#[pin]
|
||||
inner: ManuallyDrop<T>,
|
||||
span: PerfSpan,
|
||||
}
|
||||
|
||||
impl<T> PinnedDrop for PerfInstrumented<T> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let this = this.project();
|
||||
let _enter = this.span.enter();
|
||||
// SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
|
||||
// different from wrapping `T` in `Option` and calling
|
||||
// `Pin::set(&mut this.inner, None)`, except avoiding
|
||||
// additional memory overhead.
|
||||
// 2. `ManuallyDrop::drop()` is safe, because
|
||||
// `PinnedDrop::drop()` is guaranteed to be called only
|
||||
// once.
|
||||
unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> PerfInstrumentedProj<'a, T> {
|
||||
/// Get a mutable reference to the [`Span`] a pinned mutable reference to
|
||||
/// the wrapped type.
|
||||
fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) {
|
||||
// SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
|
||||
// and `inner` is valid, because `ManuallyDrop::drop` is called
|
||||
// only inside `Drop` of the `Instrumented`.
|
||||
let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
|
||||
(self.span, inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> Future for PerfInstrumented<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let (span, inner) = self.project().span_and_inner_pin_mut();
|
||||
let _enter = span.enter();
|
||||
inner.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized> PerfInstrument for T {}
|
||||
26
libs/utils/src/elapsed_accum.rs
Normal file
26
libs/utils/src/elapsed_accum.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ElapsedAccum {
|
||||
accum: Duration,
|
||||
}
|
||||
|
||||
impl ElapsedAccum {
|
||||
pub fn get(&self) -> Duration {
|
||||
self.accum
|
||||
}
|
||||
pub fn guard(&mut self) -> impl Drop + '_ {
|
||||
let start = Instant::now();
|
||||
scopeguard::guard(start, |last_wait_at| {
|
||||
self.accum += Instant::now() - last_wait_at;
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn measure<Fut, O>(&mut self, fut: Fut) -> O
|
||||
where
|
||||
Fut: Future<Output = O>,
|
||||
{
|
||||
let _guard = self.guard();
|
||||
fut.await
|
||||
}
|
||||
}
|
||||
@@ -93,6 +93,8 @@ pub mod try_rcu;
|
||||
|
||||
pub mod guard_arc_swap;
|
||||
|
||||
pub mod elapsed_accum;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux_socket_ioctl;
|
||||
|
||||
|
||||
@@ -111,9 +111,17 @@ impl<T> OnceCell<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like [`Self::get_or_init_detached_measured`], but without out parameter for time spent waiting.
|
||||
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
|
||||
self.get_or_init_detached_measured(None).await
|
||||
}
|
||||
|
||||
/// Returns a guard to an existing initialized value, or returns an unique initialization
|
||||
/// permit which can be used to initialize this `OnceCell` using `OnceCell::set`.
|
||||
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
|
||||
pub async fn get_or_init_detached_measured(
|
||||
&self,
|
||||
mut wait_time: Option<&mut crate::elapsed_accum::ElapsedAccum>,
|
||||
) -> Result<Guard<'_, T>, InitPermit> {
|
||||
// It looks like OnceCell::get_or_init could be implemented using this method instead of
|
||||
// duplication. However, that makes the future be !Send due to possibly holding on to the
|
||||
// MutexGuard over an await point.
|
||||
@@ -125,12 +133,16 @@ impl<T> OnceCell<T> {
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire().await
|
||||
let fut = sem.acquire();
|
||||
if let Some(wait_time) = wait_time.as_mut() {
|
||||
wait_time.measure(fut).await
|
||||
} else {
|
||||
fut.await
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(permit) = permit else {
|
||||
|
||||
@@ -16,7 +16,7 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
|
||||
use metrics::set_build_info_metric;
|
||||
use nix::sys::socket::{setsockopt, sockopt};
|
||||
use pageserver::config::{PageServerConf, PageserverIdentity};
|
||||
use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
|
||||
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
|
||||
use pageserver::deletion_queue::DeletionQueue;
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
@@ -35,6 +35,7 @@ use tokio::signal::unix::SignalKind;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use tracing_utils::OtelGuard;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::crashsafe::syncfs;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
@@ -97,7 +98,7 @@ fn main() -> anyhow::Result<()> {
|
||||
env::set_current_dir(&workdir)
|
||||
.with_context(|| format!("Failed to set application's current dir to '{workdir}'"))?;
|
||||
|
||||
let conf = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
|
||||
let (conf, ignored) = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
|
||||
|
||||
// Initialize logging.
|
||||
//
|
||||
@@ -118,6 +119,21 @@ fn main() -> anyhow::Result<()> {
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
let otel_enablement = match &conf.tracing {
|
||||
Some(cfg) => tracing_utils::OtelEnablement::Enabled {
|
||||
service_name: "pageserver".to_string(),
|
||||
export_config: (&cfg.export_config).into(),
|
||||
runtime: *COMPUTE_REQUEST_RUNTIME,
|
||||
},
|
||||
None => tracing_utils::OtelEnablement::Disabled,
|
||||
};
|
||||
|
||||
let otel_guard = tracing_utils::init_performance_tracing(otel_enablement);
|
||||
|
||||
if otel_guard.is_some() {
|
||||
info!(?conf.tracing, "starting with OTEL tracing enabled");
|
||||
}
|
||||
|
||||
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
|
||||
// disarming this hook on pageserver, because we never tear down tracing.
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
@@ -128,7 +144,17 @@ fn main() -> anyhow::Result<()> {
|
||||
&[("node_id", &conf.id.to_string())],
|
||||
);
|
||||
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
// Warn about ignored config items; see pageserver_api::config::ConfigToml
|
||||
// doc comment for rationale why we prefer this over serde(deny_unknown_fields).
|
||||
{
|
||||
let ignored_fields::Paths { paths } = &ignored;
|
||||
for path in paths {
|
||||
warn!(?path, "ignoring unknown configuration item");
|
||||
}
|
||||
}
|
||||
|
||||
// Log configuration items for feature-flag-like config
|
||||
// (maybe we should automate this with a visitor?).
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
@@ -191,7 +217,7 @@ fn main() -> anyhow::Result<()> {
|
||||
tracing::info!("Initializing page_cache...");
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
start_pageserver(launch_ts, conf, ignored, otel_guard).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
@@ -201,7 +227,7 @@ fn initialize_config(
|
||||
identity_file_path: &Utf8Path,
|
||||
cfg_file_path: &Utf8Path,
|
||||
workdir: &Utf8Path,
|
||||
) -> anyhow::Result<&'static PageServerConf> {
|
||||
) -> anyhow::Result<(&'static PageServerConf, ignored_fields::Paths)> {
|
||||
// The deployment orchestrator writes out an indentity file containing the node id
|
||||
// for all pageservers. This file is the source of truth for the node id. In order
|
||||
// to allow for rolling back pageserver releases, the node id is also included in
|
||||
@@ -230,16 +256,36 @@ fn initialize_config(
|
||||
|
||||
let config_file_contents =
|
||||
std::fs::read_to_string(cfg_file_path).context("read config file from filesystem")?;
|
||||
let config_toml = serde_path_to_error::deserialize(
|
||||
toml_edit::de::Deserializer::from_str(&config_file_contents)
|
||||
.context("build toml deserializer")?,
|
||||
)
|
||||
.context("deserialize config toml")?;
|
||||
|
||||
// Deserialize the config file contents into a ConfigToml.
|
||||
let config_toml: pageserver_api::config::ConfigToml = {
|
||||
let deserializer = toml_edit::de::Deserializer::from_str(&config_file_contents)
|
||||
.context("build toml deserializer")?;
|
||||
let mut path_to_error_track = serde_path_to_error::Track::new();
|
||||
let deserializer =
|
||||
serde_path_to_error::Deserializer::new(deserializer, &mut path_to_error_track);
|
||||
serde::Deserialize::deserialize(deserializer).context("deserialize config toml")?
|
||||
};
|
||||
|
||||
// Find unknown fields by re-serializing the parsed ConfigToml and comparing it to the on-disk file.
|
||||
// Any fields that are only in the on-disk version are unknown.
|
||||
// (The assumption here is that the ConfigToml doesn't to skip_serializing_if.)
|
||||
// (Make sure to read the ConfigToml doc comment on why we only want to warn about, but not fail startup, on unknown fields).
|
||||
let ignored = {
|
||||
let ondisk_toml = config_file_contents
|
||||
.parse::<toml_edit::DocumentMut>()
|
||||
.context("parse original config as toml document")?;
|
||||
let parsed_toml = toml_edit::ser::to_document(&config_toml)
|
||||
.context("re-serialize config to toml document")?;
|
||||
pageserver::config::ignored_fields::find(ondisk_toml, parsed_toml)
|
||||
};
|
||||
|
||||
// Construct the runtime god object (it's called PageServerConf but actually is just global shared state).
|
||||
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
|
||||
.context("runtime-validation of config toml")?;
|
||||
let conf = Box::leak(Box::new(conf));
|
||||
|
||||
Ok(Box::leak(Box::new(conf)))
|
||||
Ok((conf, ignored))
|
||||
}
|
||||
|
||||
struct WaitForPhaseResult<F: std::future::Future + Unpin> {
|
||||
@@ -290,6 +336,8 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
|
||||
fn start_pageserver(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
conf: &'static PageServerConf,
|
||||
ignored: ignored_fields::Paths,
|
||||
otel_guard: Option<OtelGuard>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Monotonic time for later calculating startup duration
|
||||
let started_startup_at = Instant::now();
|
||||
@@ -312,7 +360,7 @@ fn start_pageserver(
|
||||
pageserver::metrics::tokio_epoll_uring::Collector::new(),
|
||||
))
|
||||
.unwrap();
|
||||
pageserver::preinitialize_metrics(conf);
|
||||
pageserver::preinitialize_metrics(conf, ignored);
|
||||
|
||||
// If any failpoints were set from FAILPOINTS environment variable,
|
||||
// print them to the log for debugging purposes
|
||||
@@ -675,13 +723,21 @@ fn start_pageserver(
|
||||
|
||||
// Spawn a task to listen for libpq connections. It will spawn further tasks
|
||||
// for each connection. We created the listener earlier already.
|
||||
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
|
||||
});
|
||||
let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone());
|
||||
let page_service = page_service::spawn(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
{
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener)
|
||||
.context("create tokio listener")?
|
||||
},
|
||||
);
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
//! file, or on the command line.
|
||||
//! See also `settings.md` for better description on every parameter.
|
||||
|
||||
pub mod ignored_fields;
|
||||
|
||||
use std::env;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
@@ -215,6 +217,8 @@ pub struct PageServerConf {
|
||||
|
||||
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
|
||||
pub generate_unarchival_heatmap: bool,
|
||||
|
||||
pub tracing: Option<pageserver_api::config::Tracing>,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -386,6 +390,7 @@ impl PageServerConf {
|
||||
validate_wal_contiguity,
|
||||
load_previous_heatmap,
|
||||
generate_unarchival_heatmap,
|
||||
tracing,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -435,6 +440,7 @@ impl PageServerConf {
|
||||
wal_receiver_protocol,
|
||||
page_service_pipelining,
|
||||
get_vectored_concurrent_io,
|
||||
tracing,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
@@ -506,6 +512,17 @@ impl PageServerConf {
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(tracing_config) = conf.tracing.as_ref() {
|
||||
let ratio = &tracing_config.sampling_ratio;
|
||||
ensure!(
|
||||
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
|
||||
format!(
|
||||
"Invalid sampling ratio: {}/{}",
|
||||
ratio.numerator, ratio.denominator
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
|
||||
.map_err(anyhow::Error::msg)
|
||||
.with_context(|| {
|
||||
@@ -545,7 +562,6 @@ impl PageServerConf {
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct PageserverIdentity {
|
||||
pub id: NodeId,
|
||||
}
|
||||
@@ -617,82 +633,4 @@ mod tests {
|
||||
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
|
||||
.expect("parse_and_validate");
|
||||
}
|
||||
|
||||
/// If there's a typo in the pageserver config, we'd rather catch that typo
|
||||
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
|
||||
/// made it in the believe that their config change is effective.
|
||||
///
|
||||
/// The default in serde is to allow unknown fields, so, we rely
|
||||
/// on developer+review discipline to add `deny_unknown_fields` when adding
|
||||
/// new structs to the config, and these tests here as a regression test.
|
||||
///
|
||||
/// The alternative to all of this would be to allow unknown fields in the config.
|
||||
/// To catch them, we could have a config check tool or mgmt API endpoint that
|
||||
/// compares the effective config with the TOML on disk and makes sure that
|
||||
/// the on-disk TOML is a strict subset of the effective config.
|
||||
mod unknown_fields_handling {
|
||||
macro_rules! test {
|
||||
($short_name:ident, $input:expr) => {
|
||||
#[test]
|
||||
fn $short_name() {
|
||||
let input = $input;
|
||||
let err = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(&input)
|
||||
.expect_err("some_invalid_field is an invalid field");
|
||||
dbg!(&err);
|
||||
assert!(err.to_string().contains("some_invalid_field"));
|
||||
}
|
||||
};
|
||||
}
|
||||
use indoc::indoc;
|
||||
|
||||
test!(
|
||||
toplevel,
|
||||
indoc! {r#"
|
||||
some_invalid_field = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
test!(
|
||||
toplevel_nested,
|
||||
indoc! {r#"
|
||||
[some_invalid_field]
|
||||
foo = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
test!(
|
||||
disk_usage_based_eviction,
|
||||
indoc! {r#"
|
||||
[disk_usage_based_eviction]
|
||||
some_invalid_field = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
test!(
|
||||
tenant_config,
|
||||
indoc! {r#"
|
||||
[tenant_config]
|
||||
some_invalid_field = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
test!(
|
||||
l0_flush,
|
||||
indoc! {r#"
|
||||
[l0_flush]
|
||||
mode = "direct"
|
||||
some_invalid_field = 23
|
||||
"#}
|
||||
);
|
||||
|
||||
// TODO: fix this => https://github.com/neondatabase/neon/issues/8915
|
||||
// test!(
|
||||
// remote_storage_config,
|
||||
// indoc! {r#"
|
||||
// [remote_storage_config]
|
||||
// local_path = "/nonexistent"
|
||||
// some_invalid_field = 23
|
||||
// "#}
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
179
pageserver/src/config/ignored_fields.rs
Normal file
179
pageserver/src/config/ignored_fields.rs
Normal file
@@ -0,0 +1,179 @@
|
||||
//! Check for fields in the on-disk config file that were ignored when
|
||||
//! deserializing [`pageserver_api::config::ConfigToml`].
|
||||
//!
|
||||
//! This could have been part of the [`pageserver_api::config`] module,
|
||||
//! but the way we identify unused fields in this module
|
||||
//! is specific to the format (TOML) and the implementation of the
|
||||
//! deserialization for that format ([`toml_edit`]).
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use itertools::Itertools;
|
||||
|
||||
/// Pass in the user-specified config and the re-serialized [`pageserver_api::config::ConfigToml`].
|
||||
/// The returned [`Paths`] contains the paths to the fields that were ignored by deserialization
|
||||
/// of the [`pageserver_api::config::ConfigToml`].
|
||||
pub fn find(user_specified: toml_edit::DocumentMut, reserialized: toml_edit::DocumentMut) -> Paths {
|
||||
let user_specified = paths(user_specified);
|
||||
let reserialized = paths(reserialized);
|
||||
fn paths(doc: toml_edit::DocumentMut) -> HashSet<String> {
|
||||
let mut out = Vec::new();
|
||||
let mut visitor = PathsVisitor::new(&mut out);
|
||||
visitor.visit_table_like(doc.as_table());
|
||||
HashSet::from_iter(out)
|
||||
}
|
||||
|
||||
let mut ignored = HashSet::new();
|
||||
|
||||
// O(n) because of HashSet
|
||||
for path in user_specified {
|
||||
if !reserialized.contains(&path) {
|
||||
ignored.insert(path);
|
||||
}
|
||||
}
|
||||
|
||||
Paths {
|
||||
paths: ignored
|
||||
.into_iter()
|
||||
// sort lexicographically for deterministic output
|
||||
.sorted()
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Paths {
|
||||
pub paths: Vec<String>,
|
||||
}
|
||||
|
||||
struct PathsVisitor<'a> {
|
||||
stack: Vec<String>,
|
||||
out: &'a mut Vec<String>,
|
||||
}
|
||||
|
||||
impl<'a> PathsVisitor<'a> {
|
||||
fn new(out: &'a mut Vec<String>) -> Self {
|
||||
Self {
|
||||
stack: Vec::new(),
|
||||
out,
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_table_like(&mut self, table_like: &dyn toml_edit::TableLike) {
|
||||
for (entry, item) in table_like.iter() {
|
||||
self.stack.push(entry.to_string());
|
||||
self.visit_item(item);
|
||||
self.stack.pop();
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_item(&mut self, item: &toml_edit::Item) {
|
||||
match item {
|
||||
toml_edit::Item::None => (),
|
||||
toml_edit::Item::Value(value) => self.visit_value(value),
|
||||
toml_edit::Item::Table(table) => {
|
||||
self.visit_table_like(table);
|
||||
}
|
||||
toml_edit::Item::ArrayOfTables(array_of_tables) => {
|
||||
for (i, table) in array_of_tables.iter().enumerate() {
|
||||
self.stack.push(format!("[{i}]"));
|
||||
self.visit_table_like(table);
|
||||
self.stack.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_value(&mut self, value: &toml_edit::Value) {
|
||||
match value {
|
||||
toml_edit::Value::String(_)
|
||||
| toml_edit::Value::Integer(_)
|
||||
| toml_edit::Value::Float(_)
|
||||
| toml_edit::Value::Boolean(_)
|
||||
| toml_edit::Value::Datetime(_) => self.out.push(self.stack.join(".")),
|
||||
toml_edit::Value::Array(array) => {
|
||||
for (i, value) in array.iter().enumerate() {
|
||||
self.stack.push(format!("[{i}]"));
|
||||
self.visit_value(value);
|
||||
self.stack.pop();
|
||||
}
|
||||
}
|
||||
toml_edit::Value::InlineTable(inline_table) => self.visit_table_like(inline_table),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
|
||||
fn test_impl(original: &str, parsed: &str, expect: [&str; 1]) {
|
||||
let original: toml_edit::DocumentMut = original.parse().expect("parse original config");
|
||||
let parsed: toml_edit::DocumentMut = parsed.parse().expect("parse re-serialized config");
|
||||
|
||||
let super::Paths { paths: actual } = super::find(original, parsed);
|
||||
assert_eq!(actual, &expect);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn top_level() {
|
||||
test_impl(
|
||||
r#"
|
||||
[a]
|
||||
b = 1
|
||||
c = 2
|
||||
d = 3
|
||||
"#,
|
||||
r#"
|
||||
[a]
|
||||
b = 1
|
||||
c = 2
|
||||
"#,
|
||||
["a.d"],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nested() {
|
||||
test_impl(
|
||||
r#"
|
||||
[a.b.c]
|
||||
d = 23
|
||||
"#,
|
||||
r#"
|
||||
[a]
|
||||
e = 42
|
||||
"#,
|
||||
["a.b.c.d"],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn array_of_tables() {
|
||||
test_impl(
|
||||
r#"
|
||||
[[a]]
|
||||
b = 1
|
||||
c = 2
|
||||
d = 3
|
||||
"#,
|
||||
r#"
|
||||
[[a]]
|
||||
b = 1
|
||||
c = 2
|
||||
"#,
|
||||
["a.[0].d"],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn array() {
|
||||
test_impl(
|
||||
r#"
|
||||
foo = [ {bar = 23} ]
|
||||
"#,
|
||||
r#"
|
||||
foo = [ { blup = 42 }]
|
||||
"#,
|
||||
["foo.[0].bar"],
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -89,7 +89,7 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tracing::warn;
|
||||
@@ -100,6 +100,12 @@ use crate::{
|
||||
task_mgr::TaskKind,
|
||||
tenant::Timeline,
|
||||
};
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use std::future::Future;
|
||||
use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
|
||||
|
||||
use tracing::{Dispatch, Span};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
pub struct RequestContext {
|
||||
@@ -109,6 +115,8 @@ pub struct RequestContext {
|
||||
page_content_kind: PageContentKind,
|
||||
read_path_debug: bool,
|
||||
scope: Scope,
|
||||
perf_span: Option<PerfSpan>,
|
||||
perf_span_dispatch: Option<Dispatch>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -263,22 +271,15 @@ impl RequestContextBuilder {
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
scope: Scope::new_global(),
|
||||
perf_span: None,
|
||||
perf_span_dispatch: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extend(original: &RequestContext) -> Self {
|
||||
pub fn from(original: &RequestContext) -> Self {
|
||||
Self {
|
||||
// This is like a Copy, but avoid implementing Copy because ordinary users of
|
||||
// RequestContext should always move or ref it.
|
||||
inner: RequestContext {
|
||||
task_kind: original.task_kind,
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
read_path_debug: original.read_path_debug,
|
||||
scope: original.scope.clone(),
|
||||
},
|
||||
inner: original.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -316,12 +317,74 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
|
||||
self.inner.perf_span_dispatch = dispatch;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce() -> Span,
|
||||
{
|
||||
assert!(self.inner.perf_span.is_none());
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
if let Some(ref perf_span) = self.inner.perf_span {
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn attached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn detached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestContext {
|
||||
/// Private clone implementation
|
||||
///
|
||||
/// Callers should use the [`RequestContextBuilder`] or child spaning APIs of
|
||||
/// [`RequestContext`].
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
task_kind: self.task_kind,
|
||||
download_behavior: self.download_behavior,
|
||||
access_stats_behavior: self.access_stats_behavior,
|
||||
page_content_kind: self.page_content_kind,
|
||||
read_path_debug: self.read_path_debug,
|
||||
scope: self.scope.clone(),
|
||||
perf_span: self.perf_span.clone(),
|
||||
perf_span_dispatch: self.perf_span_dispatch.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new RequestContext that has no parent.
|
||||
///
|
||||
/// The function is called `new` because, once we add children
|
||||
@@ -337,7 +400,7 @@ impl RequestContext {
|
||||
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::new(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
.root()
|
||||
}
|
||||
|
||||
/// Create a detached child context for a task that may outlive `self`.
|
||||
@@ -358,7 +421,10 @@ impl RequestContext {
|
||||
///
|
||||
/// We could make new calls to this function fail if `self` is already canceled.
|
||||
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
self.child_impl(task_kind, download_behavior)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.detached_child()
|
||||
}
|
||||
|
||||
/// Create a child of context `self` for a task that shall not outlive `self`.
|
||||
@@ -382,7 +448,7 @@ impl RequestContext {
|
||||
/// The method to wait for child tasks would return an error, indicating
|
||||
/// that the child task was not started because the context was canceled.
|
||||
pub fn attached_child(&self) -> Self {
|
||||
self.child_impl(self.task_kind(), self.download_behavior())
|
||||
RequestContextBuilder::from(self).attached_child()
|
||||
}
|
||||
|
||||
/// Use this function when you should be creating a child context using
|
||||
@@ -397,17 +463,10 @@ impl RequestContext {
|
||||
Self::new(task_kind, download_behavior)
|
||||
}
|
||||
|
||||
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn with_scope_timeline(&self, timeline: &Arc<Timeline>) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_timeline(timeline))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub(crate) fn with_scope_page_service_pagestream(
|
||||
@@ -416,9 +475,9 @@ impl RequestContext {
|
||||
crate::page_service::TenantManagerTypes,
|
||||
>,
|
||||
) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_page_service_pagestream(timeline_handle))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_secondary_timeline(
|
||||
@@ -426,28 +485,30 @@ impl RequestContext {
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_secondary_timeline(tenant_shard_id, timeline_id))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_secondary_tenant(&self, tenant_shard_id: &TenantShardId) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_secondary_tenant(tenant_shard_id))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn with_scope_unit_test(&self) -> Self {
|
||||
RequestContextBuilder::new(TaskKind::UnitTest)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(TaskKind::UnitTest)
|
||||
.scope(Scope::new_unit_test())
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_debug_tools(&self) -> Self {
|
||||
RequestContextBuilder::new(TaskKind::DebugTool)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(TaskKind::DebugTool)
|
||||
.scope(Scope::new_debug_tools())
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn task_kind(&self) -> TaskKind {
|
||||
@@ -504,4 +565,76 @@ impl RequestContext {
|
||||
Scope::DebugTools { io_size_metrics } => io_size_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn ondemand_download_wait_observe(&self, duration: Duration) {
|
||||
if duration == Duration::ZERO {
|
||||
return;
|
||||
}
|
||||
|
||||
match &self.scope {
|
||||
Scope::Timeline { arc_arc } => arc_arc
|
||||
.wait_ondemand_download_time
|
||||
.observe(self.task_kind, duration),
|
||||
_ => {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
backtrace=%std::backtrace::Backtrace::force_capture(),
|
||||
"ondemand downloads should always happen within timeline scope",
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
|
||||
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
|
||||
span.inner().follows_from(from_span.inner());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_perf_span(&self) -> bool {
|
||||
self.perf_span.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
/// [`Future`] extension trait that allow for creating performance
|
||||
/// spans on sampled requests
|
||||
pub(crate) trait PerfInstrumentFutureExt<'a>: Future + Send {
|
||||
/// Instrument this future with a new performance span when the
|
||||
/// provided request context indicates the originator request
|
||||
/// was sampled. Otherwise, just box the future and return it as is.
|
||||
fn maybe_perf_instrument<Fn>(
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
make_span: Fn,
|
||||
) -> BoxFuture<'a, Self::Output>
|
||||
where
|
||||
Self: Sized + 'a,
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
match &ctx.perf_span {
|
||||
Some(perf_span) => {
|
||||
assert!(ctx.perf_span_dispatch.is_some());
|
||||
let dispatcher = ctx.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
|
||||
self.instrument(new_perf_span).boxed()
|
||||
}
|
||||
None => self.boxed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Implement the trait for all types that satisfy the trait bounds
|
||||
impl<'a, T: Future + Send + 'a> PerfInstrumentFutureExt<'a> for T {}
|
||||
|
||||
@@ -2697,11 +2697,12 @@ async fn getpage_at_lsn_handler_inner(
|
||||
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
// Enable read path debugging
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true)
|
||||
.scope(context::Scope::new_timeline(&timeline)).build();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.read_path_debug(true)
|
||||
.root();
|
||||
|
||||
// Use last_record_lsn if no lsn is provided
|
||||
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
@@ -3188,7 +3189,8 @@ async fn list_aux_files(
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download)
|
||||
.with_scope_timeline(&timeline);
|
||||
let files = timeline
|
||||
.list_aux_files(body.lsn, &ctx, io_concurrency)
|
||||
.await?;
|
||||
@@ -3432,14 +3434,15 @@ async fn put_tenant_timeline_import_wal(
|
||||
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn);
|
||||
async move {
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::extend(&ctx).scope(context::Scope::new_timeline(&timeline)).build();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Warn)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.root();
|
||||
|
||||
let mut body = StreamReader::new(request.into_body().map(|res| {
|
||||
res.map_err(|error| {
|
||||
|
||||
@@ -55,6 +55,9 @@ pub const DEFAULT_PG_VERSION: u32 = 16;
|
||||
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
||||
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
||||
|
||||
// Target used for performance traces.
|
||||
pub const PERF_TRACE_TARGET: &str = "P";
|
||||
|
||||
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::RawFd;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use enum_map::{Enum as _, EnumMap};
|
||||
@@ -23,13 +21,13 @@ use pageserver_api::config::{
|
||||
};
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_backend::{QueryError, is_expected_io_error};
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
|
||||
use strum_macros::{IntoStaticStr, VariantNames};
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::config;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
use crate::pgdatadir_mapping::DatadirModificationStats;
|
||||
@@ -499,6 +497,100 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod wait_ondemand_download_time {
|
||||
use super::*;
|
||||
const WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS: &[f64] = &[
|
||||
0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, // 10 ms - 100ms
|
||||
0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, // 100ms to 1s
|
||||
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, // 1s to 10s
|
||||
10.0, 20.0, 30.0, 40.0, 50.0, 60.0, // 10s to 1m
|
||||
];
|
||||
|
||||
/// The task kinds for which we want to track wait times for on-demand downloads.
|
||||
/// Other task kinds' wait times are accumulated in label value `unknown`.
|
||||
pub(crate) const WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS: [TaskKind; 2] = [
|
||||
TaskKind::PageRequestHandler,
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
];
|
||||
|
||||
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL: Lazy<Vec<Histogram>> = Lazy::new(|| {
|
||||
let histo = register_histogram_vec!(
|
||||
"pageserver_wait_ondemand_download_seconds_global",
|
||||
"Observations are individual tasks' wait times for on-demand downloads. \
|
||||
If N tasks coalesce on an on-demand download, and it takes 10s, than we observe N * 10s.",
|
||||
&["task_kind"],
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.map(|task_kind| histo.with_label_values(&[task_kind.into()]))
|
||||
.collect::<Vec<_>>()
|
||||
});
|
||||
|
||||
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_SUM: Lazy<CounterVec> = Lazy::new(|| {
|
||||
register_counter_vec!(
|
||||
// use a name that _could_ be evolved into a per-timeline histogram later
|
||||
"pageserver_wait_ondemand_download_seconds_sum",
|
||||
"Like `pageserver_wait_ondemand_download_seconds_global` but per timeline",
|
||||
&["tenant_id", "shard_id", "timeline_id", "task_kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub struct WaitOndemandDownloadTimeSum {
|
||||
counters: [Counter; WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS.len()],
|
||||
}
|
||||
|
||||
impl WaitOndemandDownloadTimeSum {
|
||||
pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self {
|
||||
let counters = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.map(|task_kind| {
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_SUM
|
||||
.get_metric_with_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
task_kind.into(),
|
||||
])
|
||||
.unwrap()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
Self {
|
||||
counters: counters.try_into().unwrap(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn observe(&self, task_kind: TaskKind, duration: Duration) {
|
||||
let maybe = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, kind)| **kind == task_kind);
|
||||
let Some((idx, _)) = maybe else {
|
||||
return;
|
||||
};
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL[idx].observe(duration.as_secs_f64());
|
||||
let counter = &self.counters[idx];
|
||||
counter.inc_by(duration.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown_timeline(tenant_id: &str, shard_id: &str, timeline_id: &str) {
|
||||
for task_kind in WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS {
|
||||
let _ = WAIT_ONDEMAND_DOWNLOAD_TIME_SUM.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
task_kind.into(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn preinitialize_global_metrics() {
|
||||
Lazy::force(&WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL);
|
||||
}
|
||||
}
|
||||
|
||||
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_last_record_lsn",
|
||||
@@ -1248,13 +1340,13 @@ pub(crate) static STORAGE_IO_TIME_METRIC: Lazy<StorageIoTime> = Lazy::new(Storag
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(usize)]
|
||||
enum StorageIoSizeOperation {
|
||||
pub(crate) enum StorageIoSizeOperation {
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
|
||||
impl StorageIoSizeOperation {
|
||||
const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
pub(crate) const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
|
||||
fn as_str(&self) -> &'static str {
|
||||
Self::VARIANTS[*self as usize]
|
||||
@@ -1262,7 +1354,7 @@ impl StorageIoSizeOperation {
|
||||
}
|
||||
|
||||
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
|
||||
static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
pub(crate) static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"Total amount of bytes read/written in IO operations",
|
||||
@@ -2314,13 +2406,18 @@ impl RemoteOpFileKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
pub(crate) static REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_remote_operation_seconds",
|
||||
"Time spent on remote storage operations. \
|
||||
Grouped by tenant, timeline, operation_kind and status. \
|
||||
"pageserver_remote_timeline_client_seconds_global",
|
||||
"Time spent on remote timeline client operations. \
|
||||
Grouped by task_kind, file_kind, operation_kind and status. \
|
||||
The task_kind is \
|
||||
- for layer downloads, populated from RequestContext (primary objective of having the label) \
|
||||
- for index downloads, set to 'unknown' \
|
||||
- for any upload operation, set to 'RemoteUploadTask' \
|
||||
This keeps dimensionality at bay. \
|
||||
Does not account for time spent waiting in remote timeline client's queues.",
|
||||
&["file_kind", "op_kind", "status"]
|
||||
&["task_kind", "file_kind", "op_kind", "status"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
@@ -2882,6 +2979,7 @@ pub(crate) struct TimelineMetrics {
|
||||
pub storage_io_size: StorageIoSizeMetrics,
|
||||
pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter,
|
||||
pub wait_lsn_start_finish_counterpair: IntCounterPair,
|
||||
pub wait_ondemand_download_time: wait_ondemand_download_time::WaitOndemandDownloadTimeSum,
|
||||
shutdown: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
@@ -3027,6 +3125,13 @@ impl TimelineMetrics {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let wait_ondemand_download_time =
|
||||
wait_ondemand_download_time::WaitOndemandDownloadTimeSum::new(
|
||||
&tenant_id,
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
shard_id,
|
||||
@@ -3060,6 +3165,7 @@ impl TimelineMetrics {
|
||||
wal_records_received,
|
||||
wait_lsn_in_progress_micros,
|
||||
wait_lsn_start_finish_counterpair,
|
||||
wait_ondemand_download_time,
|
||||
shutdown: std::sync::atomic::AtomicBool::default(),
|
||||
}
|
||||
}
|
||||
@@ -3252,6 +3358,8 @@ impl TimelineMetrics {
|
||||
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
|
||||
wait_ondemand_download_time::shutdown_timeline(tenant_id, shard_id, timeline_id);
|
||||
|
||||
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
|
||||
SmgrQueryType::GetPageAtLsn.into(),
|
||||
tenant_id,
|
||||
@@ -3373,13 +3481,18 @@ impl RemoteTimelineClientMetrics {
|
||||
|
||||
pub fn remote_operation_time(
|
||||
&self,
|
||||
task_kind: Option<TaskKind>,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
status: &'static str,
|
||||
) -> Histogram {
|
||||
let key = (file_kind.as_str(), op_kind.as_str(), status);
|
||||
REMOTE_OPERATION_TIME
|
||||
.get_metric_with_label_values(&[key.0, key.1, key.2])
|
||||
REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind.as_ref().map(|tk| tk.into()).unwrap_or("unknown"),
|
||||
file_kind.as_str(),
|
||||
op_kind.as_str(),
|
||||
status,
|
||||
])
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -3624,54 +3737,26 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
|
||||
/// Wrapper future that measures the time spent by a remote storage operation,
|
||||
/// and records the time and success/failure as a prometheus metric.
|
||||
pub(crate) trait MeasureRemoteOp: Sized {
|
||||
fn measure_remote_op(
|
||||
pub(crate) trait MeasureRemoteOp<O, E>: Sized + Future<Output = Result<O, E>> {
|
||||
async fn measure_remote_op(
|
||||
self,
|
||||
task_kind: Option<TaskKind>, // not all caller contexts have a RequestContext / TaskKind handy
|
||||
file_kind: RemoteOpFileKind,
|
||||
op: RemoteOpKind,
|
||||
metrics: Arc<RemoteTimelineClientMetrics>,
|
||||
) -> MeasuredRemoteOp<Self> {
|
||||
) -> Result<O, E> {
|
||||
let start = Instant::now();
|
||||
MeasuredRemoteOp {
|
||||
inner: self,
|
||||
file_kind,
|
||||
op,
|
||||
start,
|
||||
metrics,
|
||||
}
|
||||
let res = self.await;
|
||||
let duration = start.elapsed();
|
||||
let status = if res.is_ok() { &"success" } else { &"failure" };
|
||||
metrics
|
||||
.remote_operation_time(task_kind, &file_kind, &op, status)
|
||||
.observe(duration.as_secs_f64());
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized> MeasureRemoteOp for T {}
|
||||
|
||||
pin_project! {
|
||||
pub(crate) struct MeasuredRemoteOp<F>
|
||||
{
|
||||
#[pin]
|
||||
inner: F,
|
||||
file_kind: RemoteOpFileKind,
|
||||
op: RemoteOpKind,
|
||||
start: Instant,
|
||||
metrics: Arc<RemoteTimelineClientMetrics>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
|
||||
type Output = Result<O, E>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let poll_result = this.inner.poll(cx);
|
||||
if let Poll::Ready(ref res) = poll_result {
|
||||
let duration = this.start.elapsed();
|
||||
let status = if res.is_ok() { &"success" } else { &"failure" };
|
||||
this.metrics
|
||||
.remote_operation_time(this.file_kind, this.op, status)
|
||||
.observe(duration.as_secs_f64());
|
||||
}
|
||||
poll_result
|
||||
}
|
||||
}
|
||||
impl<Fut, O, E> MeasureRemoteOp<O, E> for Fut where Fut: Sized + Future<Output = Result<O, E>> {}
|
||||
|
||||
pub mod tokio_epoll_uring {
|
||||
use std::collections::HashMap;
|
||||
@@ -4107,9 +4192,33 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
|
||||
.set(u64::try_from(num_threads.get()).unwrap());
|
||||
}
|
||||
|
||||
pub fn preinitialize_metrics(conf: &'static PageServerConf) {
|
||||
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_config_ignored_items",
|
||||
"TOML items present in the on-disk configuration file but ignored by the pageserver config parser.\
|
||||
The `item` label is the dot-separated path of the ignored item in the on-disk configuration file.\
|
||||
The value for an unknown config item is always 1.\
|
||||
There is a special label value \"\", which is 0, so that there is always a metric exposed (simplifies dashboards).",
|
||||
&["item"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub fn preinitialize_metrics(
|
||||
conf: &'static PageServerConf,
|
||||
ignored: config::ignored_fields::Paths,
|
||||
) {
|
||||
set_page_service_config_max_batch_size(&conf.page_service_pipelining);
|
||||
|
||||
PAGESERVER_CONFIG_IGNORED_ITEMS
|
||||
.with_label_values(&[""])
|
||||
.set(0);
|
||||
for path in &ignored.paths {
|
||||
PAGESERVER_CONFIG_IGNORED_ITEMS
|
||||
.with_label_values(&[path])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
// Python tests need these and on some we do alerting.
|
||||
//
|
||||
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
|
||||
@@ -4195,4 +4304,5 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
|
||||
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);
|
||||
|
||||
tenant_throttling::preinitialize_global_metrics();
|
||||
wait_ondemand_download_time::preinitialize_global_metrics();
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{io, str};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
@@ -53,7 +54,9 @@ use utils::sync::spsc_fold;
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
|
||||
TimelineMetrics,
|
||||
@@ -100,6 +103,7 @@ pub fn spawn(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
pg_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
tcp_listener: tokio::net::TcpListener,
|
||||
) -> Listener {
|
||||
let cancel = CancellationToken::new();
|
||||
@@ -117,6 +121,7 @@ pub fn spawn(
|
||||
conf,
|
||||
tenant_manager,
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
tcp_listener,
|
||||
conf.pg_auth_type,
|
||||
conf.page_service_pipelining.clone(),
|
||||
@@ -173,6 +178,7 @@ pub async fn libpq_listener_main(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: tokio::net::TcpListener,
|
||||
auth_type: AuthType,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
@@ -205,8 +211,12 @@ pub async fn libpq_listener_main(
|
||||
// Connection established. Spawn a new task to handle it.
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let local_auth = auth.clone();
|
||||
let connection_ctx = listener_ctx
|
||||
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
|
||||
let connection_ctx = RequestContextBuilder::from(&listener_ctx)
|
||||
.task_kind(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.perf_span_dispatch(perf_trace_dispatch.clone())
|
||||
.detached_child();
|
||||
|
||||
connection_handler_tasks.spawn(page_service_conn_main(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
@@ -237,6 +247,15 @@ pub async fn libpq_listener_main(
|
||||
|
||||
type ConnectionHandlerResult = anyhow::Result<()>;
|
||||
|
||||
/// Perf root spans start at the per-request level, after shard routing.
|
||||
/// This struct carries connection-level information to the root perf span definition.
|
||||
#[derive(Clone)]
|
||||
struct ConnectionPerfSpanFields {
|
||||
peer_addr: String,
|
||||
application_name: Option<String>,
|
||||
compute_mode: Option<String>,
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn page_service_conn_main(
|
||||
@@ -261,6 +280,12 @@ async fn page_service_conn_main(
|
||||
let socket_fd = socket.as_raw_fd();
|
||||
|
||||
let peer_addr = socket.peer_addr().context("get peer address")?;
|
||||
|
||||
let perf_span_fields = ConnectionPerfSpanFields {
|
||||
peer_addr: peer_addr.to_string(),
|
||||
application_name: None, // filled in later
|
||||
compute_mode: None, // filled in later
|
||||
};
|
||||
tracing::Span::current().record("peer_addr", field::display(peer_addr));
|
||||
|
||||
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
|
||||
@@ -304,6 +329,7 @@ async fn page_service_conn_main(
|
||||
tenant_manager,
|
||||
auth,
|
||||
pipelining_config,
|
||||
perf_span_fields,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
gate_guard,
|
||||
@@ -348,6 +374,8 @@ struct PageServerHandler {
|
||||
/// `process_query` creates a child context from this one.
|
||||
connection_ctx: RequestContext,
|
||||
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
|
||||
cancel: CancellationToken,
|
||||
|
||||
/// None only while pagestream protocol is being processed.
|
||||
@@ -607,6 +635,7 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -692,11 +721,13 @@ impl BatchedFeMessage {
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
@@ -706,6 +737,7 @@ impl PageServerHandler {
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
perf_span_fields,
|
||||
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
|
||||
cancel,
|
||||
pipelining_config,
|
||||
@@ -743,6 +775,7 @@ impl PageServerHandler {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_handles: &mut TimelineHandles,
|
||||
conn_perf_span_fields: &ConnectionPerfSpanFields,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
@@ -902,10 +935,12 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard = match timeline_handles
|
||||
|
||||
let res = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
|
||||
let shard = match res {
|
||||
Ok(tl) => tl,
|
||||
Err(e) => {
|
||||
let span = mkspan!(before shard routing);
|
||||
@@ -932,6 +967,41 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = if shard.is_get_page_request_sampled() {
|
||||
RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_PAGE",
|
||||
peer_addr = conn_perf_span_fields.peer_addr,
|
||||
application_name = conn_perf_span_fields.application_name,
|
||||
compute_mode = conn_perf_span_fields.compute_mode,
|
||||
tenant_id = %tenant_id,
|
||||
shard_id = %shard.get_shard_identity().shard_slug(),
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid,
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child()
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
// This ctx travels as part of the BatchedFeMessage through
|
||||
// batching into the request handler.
|
||||
// The request handler needs to do some per-request work
|
||||
// (relsize check) before dispatching the batch as a single
|
||||
// get_vectored call to the Timeline.
|
||||
// This ctx will be used for the reslize check, whereas the
|
||||
// get_vectored call will be a different ctx with separate
|
||||
// perf span.
|
||||
let ctx = ctx.with_scope_page_service_pagestream(&shard);
|
||||
|
||||
// Similar game for this `span`: we funnel it through so that
|
||||
// request handler log messages contain the request-specific fields.
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
@@ -939,19 +1009,34 @@ impl PageServerHandler {
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"THROTTLE",
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
let res = Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
&ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
.await
|
||||
{
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"WAIT_LSN",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let effective_request_lsn = match res {
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
return respond_error!(span, e);
|
||||
@@ -961,7 +1046,7 @@ impl PageServerHandler {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, ctx }],
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1514,12 +1599,14 @@ impl PageServerHandler {
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
let err = loop {
|
||||
let msg = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
&self.perf_span_fields,
|
||||
&cancel,
|
||||
ctx,
|
||||
protocol_version,
|
||||
@@ -1653,6 +1740,8 @@ impl PageServerHandler {
|
||||
// Batcher
|
||||
//
|
||||
|
||||
let perf_span_fields = self.perf_span_fields.clone();
|
||||
|
||||
let cancel_batcher = self.cancel.child_token();
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
|
||||
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
|
||||
@@ -1666,6 +1755,7 @@ impl PageServerHandler {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
&perf_span_fields,
|
||||
&cancel_batcher,
|
||||
&ctx,
|
||||
protocol_version,
|
||||
@@ -2004,7 +2094,9 @@ impl PageServerHandler {
|
||||
|
||||
let results = timeline
|
||||
.get_rel_page_at_lsn_batched(
|
||||
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
|
||||
requests
|
||||
.iter()
|
||||
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
@@ -2606,12 +2698,14 @@ where
|
||||
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
self.perf_span_fields.application_name = Some(app_name.to_string());
|
||||
Span::current().record("application_name", field::display(app_name));
|
||||
}
|
||||
if let Some(options) = params.get("options") {
|
||||
let (config, _) = parse_options(options);
|
||||
for (key, value) in config {
|
||||
if key == "neon.compute_mode" {
|
||||
self.perf_span_fields.compute_mode = Some(value.clone());
|
||||
Span::current().record("compute_mode", field::display(value));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
use std::ops::{ControlFlow, Range};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, ensure};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
@@ -31,7 +32,7 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, info, info_span, trace, warn};
|
||||
use utils::bin_ser::{BeSer, DeserializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
@@ -39,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
@@ -209,7 +210,9 @@ impl Timeline {
|
||||
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(
|
||||
pages.iter().map(|(tag, blknum)| (tag, blknum)),
|
||||
pages
|
||||
.iter()
|
||||
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
@@ -248,7 +251,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
|
||||
effective_lsn: Lsn,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
@@ -262,8 +265,11 @@ impl Timeline {
|
||||
let mut result = Vec::with_capacity(pages.len());
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
|
||||
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
||||
BTreeMap::default();
|
||||
|
||||
let mut perf_instrument = false;
|
||||
for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -274,7 +280,16 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%effective_lsn,
|
||||
)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
@@ -297,8 +312,12 @@ impl Timeline {
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
if ctx.has_perf_span() {
|
||||
perf_instrument = true;
|
||||
}
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push(response_slot_idx);
|
||||
key_slots.push((response_slot_idx, ctx));
|
||||
}
|
||||
|
||||
let keyspace = {
|
||||
@@ -314,16 +333,34 @@ impl Timeline {
|
||||
acc.to_keyspace()
|
||||
};
|
||||
|
||||
match self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
|
||||
.await
|
||||
{
|
||||
let ctx = match perf_instrument {
|
||||
true => RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_VECTORED",
|
||||
tenant_id = %self.tenant_shard_id.tenant_id,
|
||||
timeline_id = %self.timeline_id,
|
||||
lsn = %effective_lsn,
|
||||
shard = %self.tenant_shard_id.shard_slug(),
|
||||
)
|
||||
})
|
||||
.attached_child(),
|
||||
false => ctx.attached_child(),
|
||||
};
|
||||
|
||||
let res = self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||
let first_slot = key_slots.next().unwrap();
|
||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for slot in key_slots {
|
||||
for (slot, req_ctx) in key_slots {
|
||||
let clone = match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
Err(err) => Err(match err {
|
||||
@@ -341,17 +378,22 @@ impl Timeline {
|
||||
};
|
||||
|
||||
result_slots[slot].write(clone);
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(&ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(&ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||
for slot in keys_slots.values().flatten() {
|
||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
let err = match &err {
|
||||
@@ -383,6 +425,7 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
req_ctx.perf_follows_from(&ctx);
|
||||
result_slots[*slot].write(err);
|
||||
}
|
||||
|
||||
|
||||
@@ -219,8 +219,7 @@ pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
|
||||
// Bump this number when adding a new pageserver_runtime!
|
||||
// SAFETY: it's obviously correct
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = NonZeroUsize::new(4).unwrap();
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
@@ -3689,7 +3689,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
TenantState::Active { .. } => {
|
||||
TenantState::Active => {
|
||||
return Ok(());
|
||||
}
|
||||
TenantState::Broken { reason, .. } => {
|
||||
@@ -4205,9 +4205,9 @@ impl Tenant {
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
|
||||
let timeline_ctx = RequestContextBuilder::extend(ctx)
|
||||
let timeline_ctx = RequestContextBuilder::from(ctx)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.build();
|
||||
.detached_child();
|
||||
|
||||
Ok((timeline, timeline_ctx))
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
///
|
||||
/// Complexity: O(log N)
|
||||
fn add_node(&mut self, key: i128) {
|
||||
let value = match self.nodes.range(..=key).last() {
|
||||
let value = match self.nodes.range(..=key).next_back() {
|
||||
Some((_, Some(v))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
|
||||
@@ -58,7 +58,7 @@ use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
/// - `Attached`: has a full Tenant object, is elegible to service
|
||||
/// reads and ingest WAL.
|
||||
/// reads and ingest WAL.
|
||||
/// - `Secondary`: is only keeping a local cache warm.
|
||||
///
|
||||
/// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
|
||||
|
||||
@@ -642,6 +642,7 @@ impl RemoteTimelineClient {
|
||||
cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Option::<TaskKind>::None,
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -739,6 +740,7 @@ impl RemoteTimelineClient {
|
||||
ctx,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(ctx.task_kind()),
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -2175,6 +2177,7 @@ impl RemoteTimelineClient {
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -2191,6 +2194,7 @@ impl RemoteTimelineClient {
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
|
||||
@@ -130,7 +130,7 @@ impl IndexPart {
|
||||
/// Version history
|
||||
/// - 2: added `deleted_at`
|
||||
/// - 3: no longer deserialize `timeline_layers` (serialized format is the same, but timeline_layers
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// - 4: timeline_layers is fully removed.
|
||||
/// - 5: lineage was added
|
||||
/// - 6: last_aux_file_policy is added.
|
||||
|
||||
@@ -167,10 +167,17 @@ impl SecondaryTenant {
|
||||
|
||||
self.validate_metrics();
|
||||
|
||||
// Metrics are subtracted from and/or removed eagerly.
|
||||
// Deletions are done in the background via [`BackgroundPurges::spawn`].
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
|
||||
self.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.drain_timelines(&self.tenant_shard_id, &self.resident_size_metric);
|
||||
}
|
||||
|
||||
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::metrics::{STORAGE_IO_SIZE, StorageIoSizeOperation};
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
@@ -124,15 +125,53 @@ impl OnDiskState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(super) struct SecondaryDetailTimeline {
|
||||
on_disk_layers: HashMap<LayerName, OnDiskState>,
|
||||
|
||||
/// We remember when layers were evicted, to prevent re-downloading them.
|
||||
pub(super) evicted_at: HashMap<LayerName, SystemTime>,
|
||||
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
impl Clone for SecondaryDetailTimeline {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
on_disk_layers: self.on_disk_layers.clone(),
|
||||
evicted_at: self.evicted_at.clone(),
|
||||
// This is a bit awkward. The downloader code operates on a snapshot
|
||||
// of the secondary list to avoid locking it for extended periods of time.
|
||||
// No particularly strong reason to chose [`RequestContext::detached_child`],
|
||||
// but makes more sense than [`RequestContext::attached_child`].
|
||||
ctx: self
|
||||
.ctx
|
||||
.detached_child(self.ctx.task_kind(), self.ctx.download_behavior()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SecondaryDetailTimeline {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SecondaryDetailTimeline")
|
||||
.field("on_disk_layers", &self.on_disk_layers)
|
||||
.field("evicted_at", &self.evicted_at)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SecondaryDetailTimeline {
|
||||
pub(super) fn empty(ctx: RequestContext) -> Self {
|
||||
SecondaryDetailTimeline {
|
||||
on_disk_layers: Default::default(),
|
||||
evicted_at: Default::default(),
|
||||
ctx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn context(&self) -> &RequestContext {
|
||||
&self.ctx
|
||||
}
|
||||
|
||||
pub(super) fn remove_layer(
|
||||
&mut self,
|
||||
name: &LayerName,
|
||||
@@ -258,18 +297,50 @@ impl SecondaryDetail {
|
||||
|
||||
pub(super) fn remove_timeline(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
let removed = self.timelines.remove(timeline_id);
|
||||
if let Some(removed) = removed {
|
||||
resident_metric.sub(
|
||||
removed
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
Self::clear_timeline_metrics(tenant_shard_id, timeline_id, removed, resident_metric);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn drain_timelines(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
for (timeline_id, removed) in self.timelines.drain() {
|
||||
Self::clear_timeline_metrics(tenant_shard_id, &timeline_id, removed, resident_metric);
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_timeline_metrics(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
detail: SecondaryDetailTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
resident_metric.sub(
|
||||
detail
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
for op in StorageIoSizeOperation::VARIANTS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[
|
||||
op,
|
||||
tenant_id.as_str(),
|
||||
shard_id.as_str(),
|
||||
timeline_id.as_str(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,6 +798,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
last_heatmap,
|
||||
timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -774,7 +846,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
let ctx = &ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline.timeline_id);
|
||||
let timeline_state = timeline_states
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("Just populated above");
|
||||
@@ -917,7 +988,11 @@ impl<'a> TenantDownloader<'a> {
|
||||
for delete_timeline in &delete_timelines {
|
||||
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
|
||||
// from disk fails that will be a fatal error.
|
||||
detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric);
|
||||
detail.remove_timeline(
|
||||
self.secondary_state.get_tenant_shard_id(),
|
||||
delete_timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1013,7 +1088,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
timeline: HeatMapTimeline,
|
||||
timeline_state: SecondaryDetailTimeline,
|
||||
deadline: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
@@ -1044,7 +1118,12 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline_id, layer, ctx)
|
||||
.download_layer(
|
||||
tenant_shard_id,
|
||||
&timeline_id,
|
||||
layer,
|
||||
timeline_state.context(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
@@ -1155,13 +1234,16 @@ impl<'a> TenantDownloader<'a> {
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
|
||||
|
||||
let (result, touched) = self
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline)
|
||||
.await;
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
|
||||
{
|
||||
let mut detail = self.secondary_state.detail.lock().unwrap();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_insert_with(|| {
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline_id);
|
||||
SecondaryDetailTimeline::empty(ctx)
|
||||
});
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
touched.into_iter().for_each(|t| {
|
||||
@@ -1295,10 +1377,12 @@ async fn init_timeline_state(
|
||||
last_heatmap: Option<&HeatMapTimeline>,
|
||||
heatmap: &HeatMapTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
ctx: &RequestContext,
|
||||
) -> SecondaryDetailTimeline {
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::default();
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::empty(ctx);
|
||||
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
|
||||
@@ -13,13 +13,13 @@ pub mod merge_iterator;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::future::Future;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
|
||||
use bytes::Bytes;
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
@@ -34,7 +34,7 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::{Instrument, trace};
|
||||
use tracing::{Instrument, info_span, trace};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
@@ -43,7 +43,9 @@ use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
@@ -874,13 +876,37 @@ impl ReadableLayer {
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,9 +896,9 @@ impl DeltaLayerInner {
|
||||
where
|
||||
Reader: BlockReader + Clone,
|
||||
{
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
@@ -1105,9 +1105,9 @@ impl DeltaLayerInner {
|
||||
all_keys.push(entry);
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
&RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
.attached_child(),
|
||||
)
|
||||
.await?;
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
|
||||
@@ -481,9 +481,9 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
@@ -421,9 +421,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
|
||||
@@ -3,12 +3,13 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use tracing::Instrument;
|
||||
use tracing::{Instrument, info_span};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -18,7 +19,7 @@ use super::delta_layer::{self};
|
||||
use super::image_layer::{self};
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
|
||||
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
@@ -324,16 +325,29 @@ impl Layer {
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let downloaded =
|
||||
let downloaded = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.0
|
||||
.get_or_maybe_download(true, ctx)
|
||||
.get_or_maybe_download(true, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
}
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
})?
|
||||
};
|
||||
|
||||
let this = ResidentLayer {
|
||||
downloaded: downloaded.clone(),
|
||||
owner: self.clone(),
|
||||
@@ -341,9 +355,20 @@ impl Layer {
|
||||
|
||||
self.record_access(ctx);
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"VISIT_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
downloaded
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, &ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
GetVectoredError::Other(err) => GetVectoredError::Other(
|
||||
@@ -950,6 +975,10 @@ impl LayerInner {
|
||||
allow_download: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<DownloadedLayer>, DownloadError> {
|
||||
let mut wait_for_download_recorder =
|
||||
scopeguard::guard(utils::elapsed_accum::ElapsedAccum::default(), |accum| {
|
||||
ctx.ondemand_download_wait_observe(accum.get());
|
||||
});
|
||||
let (weak, permit) = {
|
||||
// get_or_init_detached can:
|
||||
// - be fast (mutex lock) OR uncontested semaphore permit acquire
|
||||
@@ -958,7 +987,7 @@ impl LayerInner {
|
||||
|
||||
let locked = self
|
||||
.inner
|
||||
.get_or_init_detached()
|
||||
.get_or_init_detached_measured(Some(&mut wait_for_download_recorder))
|
||||
.await
|
||||
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
|
||||
|
||||
@@ -988,6 +1017,7 @@ impl LayerInner {
|
||||
Err(permit) => (None, permit),
|
||||
}
|
||||
};
|
||||
let _guard = wait_for_download_recorder.guard();
|
||||
|
||||
if let Some(weak) = weak {
|
||||
// only drop the weak after dropping the heavier_once_cell guard
|
||||
@@ -1045,15 +1075,34 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
|
||||
let ctx = if ctx.has_perf_span() {
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::LayerDownload)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason
|
||||
)
|
||||
})
|
||||
.detached_child();
|
||||
ctx.perf_follows_from(&dl_ctx);
|
||||
dl_ctx
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, download_ctx)
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
Ok(res)
|
||||
}
|
||||
@@ -1158,6 +1207,7 @@ impl LayerInner {
|
||||
permit: heavier_once_cell::InitPermit,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
|
||||
let start = std::time::Instant::now();
|
||||
let result = timeline
|
||||
.remote_client
|
||||
.download_layer_file(
|
||||
@@ -1169,7 +1219,8 @@ impl LayerInner {
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let latency = start.elapsed();
|
||||
let latency_millis = u64::try_from(latency.as_millis()).unwrap();
|
||||
match result {
|
||||
Ok(size) => {
|
||||
assert_eq!(size, self.desc.file_size);
|
||||
@@ -1185,9 +1236,8 @@ impl LayerInner {
|
||||
Err(e) => {
|
||||
panic!("post-condition failed: needs_download errored: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(size=%self.desc.file_size, "on-demand download successful");
|
||||
};
|
||||
tracing::info!(size=%self.desc.file_size, %latency_millis, "on-demand download successful");
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_add(self.desc.file_size);
|
||||
@@ -1216,7 +1266,7 @@ impl LayerInner {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
tracing::error!(consecutive_failures, %latency_millis, "layer file download failed: {e:#}");
|
||||
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
@@ -1720,9 +1770,9 @@ impl DownloadedLayer {
|
||||
);
|
||||
|
||||
let res = if owner.desc.is_delta {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
|
||||
.build();
|
||||
.attached_child();
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
@@ -1738,9 +1788,9 @@ impl DownloadedLayer {
|
||||
.await
|
||||
.map(LayerKind::Delta)
|
||||
} else {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
|
||||
.build();
|
||||
.attached_child();
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -119,6 +119,10 @@ async fn smoke_test() {
|
||||
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
|
||||
assert!(matches!(e, EvictionError::NotFound));
|
||||
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.attached_child();
|
||||
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValuesReconstructState::new(io_concurrency.clone());
|
||||
@@ -127,7 +131,7 @@ async fn smoke_test() {
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
ctx,
|
||||
&dl_ctx,
|
||||
)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
@@ -177,7 +181,7 @@ async fn smoke_test() {
|
||||
|
||||
// plain downloading is rarely needed
|
||||
layer
|
||||
.download_and_keep_resident(ctx)
|
||||
.download_and_keep_resident(&dl_ctx)
|
||||
.instrument(download_span)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -645,9 +649,10 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
|
||||
let ctx = ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
let layers = timeline.layers.read().await;
|
||||
@@ -730,9 +735,9 @@ async fn evict_and_wait_does_not_wait_for_download() {
|
||||
let ctx = ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, Result, anyhow, bail, ensure};
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
@@ -96,7 +97,9 @@ use super::{
|
||||
};
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
@@ -1289,9 +1292,22 @@ impl Timeline {
|
||||
};
|
||||
reconstruct_state.read_path = read_path;
|
||||
|
||||
let traversal_res: Result<(), _> = self
|
||||
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
.await;
|
||||
let traversal_res: Result<(), _> = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
};
|
||||
|
||||
if let Err(err) = traversal_res {
|
||||
// Wait for all the spawned IOs to complete.
|
||||
// See comments on `spawn_io` inside `storage_layer` for more details.
|
||||
@@ -1305,14 +1321,46 @@ impl Timeline {
|
||||
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT_KEY",
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
async move {
|
||||
assert_eq!(state.situation, ValueReconstructSituation::Complete);
|
||||
|
||||
let converted = match state.collect_pending_ios().await {
|
||||
let res = state
|
||||
.collect_pending_ios()
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WAIT_FOR_IO_COMPLETIONS",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let converted = match res {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
@@ -1329,16 +1377,27 @@ impl Timeline {
|
||||
"{converted:?}"
|
||||
);
|
||||
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
let walredo_deltas = converted.num_deltas();
|
||||
let walredo_res = walredo_self
|
||||
.reconstruct_value(key, lsn, converted)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WALREDO",
|
||||
deltas = %walredo_deltas,
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
(key, walredo_res)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs
|
||||
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await;
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
@@ -2247,7 +2306,7 @@ impl Timeline {
|
||||
.await
|
||||
.expect("holding a reference to self");
|
||||
}
|
||||
TimelineState::Active { .. } => {
|
||||
TimelineState::Active => {
|
||||
return Ok(());
|
||||
}
|
||||
TimelineState::Broken { .. } | TimelineState::Stopping => {
|
||||
@@ -2417,6 +2476,31 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
|
||||
}
|
||||
|
||||
/// Checks if a get page request should get perf tracing
|
||||
///
|
||||
/// The configuration priority is: tenant config override, default tenant config,
|
||||
/// pageserver config.
|
||||
pub(crate) fn is_get_page_request_sampled(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
let ratio = tenant_conf
|
||||
.tenant_conf
|
||||
.sampling_ratio
|
||||
.flatten()
|
||||
.or(self.conf.default_tenant_conf.sampling_ratio)
|
||||
.or(self.conf.tracing.as_ref().map(|t| t.sampling_ratio));
|
||||
|
||||
match ratio {
|
||||
Some(r) => {
|
||||
if r.numerator == 0 {
|
||||
false
|
||||
} else {
|
||||
rand::thread_rng().gen_range(0..r.denominator) < r.numerator
|
||||
}
|
||||
}
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -3875,15 +3959,30 @@ impl Timeline {
|
||||
let TimelineVisitOutcome {
|
||||
completed_keyspace: completed,
|
||||
image_covered_keyspace,
|
||||
} = Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO_TIMELINE",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
&ctx,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?
|
||||
};
|
||||
|
||||
keyspace.remove_overlapping_with(&completed);
|
||||
|
||||
@@ -3927,8 +4026,24 @@ impl Timeline {
|
||||
|
||||
// Take the min to avoid reconstructing a page with data newer than request Lsn.
|
||||
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_ANCESTOR",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
ancestor = %ancestor_timeline.timeline_id,
|
||||
ancestor_lsn = %timeline.ancestor_lsn
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
timeline_owned = timeline
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?;
|
||||
timeline = &*timeline_owned;
|
||||
};
|
||||
@@ -7259,9 +7374,9 @@ mod tests {
|
||||
|
||||
eprintln!("Downloading {layer} and re-generating heatmap");
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
let ctx = &RequestContextBuilder::from(ctx)
|
||||
.download_behavior(crate::context::DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let _resident = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
|
||||
@@ -26,7 +26,7 @@ use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::{KeySpace, ShardedRange};
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
@@ -61,7 +61,7 @@ use crate::tenant::timeline::{
|
||||
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
|
||||
ResidentLayer, drop_rlock,
|
||||
};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded, gc_block};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
@@ -123,7 +123,6 @@ impl GcCompactionQueueItem {
|
||||
#[derive(Default)]
|
||||
struct GcCompactionGuardItems {
|
||||
notify: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
gc_guard: Option<gc_block::Guard>,
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
@@ -279,7 +278,7 @@ impl GcCompactionQueue {
|
||||
gc_compaction_ratio_percent: u64,
|
||||
) -> bool {
|
||||
const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
|
||||
if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT {
|
||||
if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
|
||||
// Do not auto-trigger when physical size >= 150GB
|
||||
return false;
|
||||
}
|
||||
@@ -319,7 +318,12 @@ impl GcCompactionQueue {
|
||||
flags
|
||||
},
|
||||
sub_compaction: true,
|
||||
compact_key_range: None,
|
||||
// Only auto-trigger gc-compaction over the data keyspace due to concerns in
|
||||
// https://github.com/neondatabase/neon/issues/11318.
|
||||
compact_key_range: Some(CompactKeyRange {
|
||||
start: Key::MIN,
|
||||
end: Key::metadata_key_range().start,
|
||||
}),
|
||||
compact_lsn_range: None,
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
},
|
||||
@@ -343,44 +347,45 @@ impl GcCompactionQueue {
|
||||
info!("compaction job id={} finished", id);
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some(items) = guard.guards.remove(&id) {
|
||||
drop(items.gc_guard);
|
||||
if let Some(tx) = items.notify {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_running_job(&self) {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
|
||||
async fn handle_sub_compaction(
|
||||
&self,
|
||||
id: GcCompactionJobId,
|
||||
options: CompactOptions,
|
||||
timeline: &Arc<Timeline>,
|
||||
gc_block: &GcBlock,
|
||||
auto: bool,
|
||||
) -> Result<(), CompactionError> {
|
||||
info!(
|
||||
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
);
|
||||
let jobs = timeline
|
||||
let res = timeline
|
||||
.gc_compaction_split_jobs(
|
||||
GcCompactJob::from_compact_options(options.clone()),
|
||||
options.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await?;
|
||||
.await;
|
||||
let jobs = match res {
|
||||
Ok(jobs) => jobs,
|
||||
Err(err) => {
|
||||
warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
|
||||
self.notify_and_unblock(id);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
self.notify_and_unblock(id);
|
||||
} else {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let jobs_len = jobs.len();
|
||||
let mut pending_tasks = Vec::new();
|
||||
// gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
|
||||
@@ -415,7 +420,6 @@ impl GcCompactionQueue {
|
||||
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
let mut tasks = Vec::new();
|
||||
for task in pending_tasks {
|
||||
let id = guard.next_id();
|
||||
@@ -446,7 +450,18 @@ impl GcCompactionQueue {
|
||||
if let Err(err) = &res {
|
||||
log_compaction_error(err, None, cancel.is_cancelled());
|
||||
}
|
||||
res
|
||||
match res {
|
||||
Ok(res) => Ok(res),
|
||||
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
|
||||
Err(_) => {
|
||||
// There are some cases where traditional gc might collect some layer
|
||||
// files causing gc-compaction cannot read the full history of the key.
|
||||
// This needs to be resolved in the long-term by improving the compaction
|
||||
// process. For now, let's simply avoid such errors triggering the
|
||||
// circuit breaker.
|
||||
Ok(CompactionOutcome::Skipped)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn iteration_inner(
|
||||
@@ -494,27 +509,32 @@ impl GcCompactionQueue {
|
||||
info!(
|
||||
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
);
|
||||
self.handle_sub_compaction(id, options, timeline, gc_block, auto)
|
||||
self.handle_sub_compaction(id, options, timeline, auto)
|
||||
.await?;
|
||||
} else {
|
||||
// Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
|
||||
// in this branch.
|
||||
let gc_guard = match gc_block.start().await {
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
self.notify_and_unblock(id);
|
||||
self.clear_running_job();
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
}
|
||||
let compaction_result =
|
||||
timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
self.notify_and_unblock(id);
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(%err, "failed to run gc-compaction");
|
||||
self.notify_and_unblock(id);
|
||||
self.clear_running_job();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
yield_for_l0 = true;
|
||||
}
|
||||
@@ -522,7 +542,25 @@ impl GcCompactionQueue {
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
self.clear_running_job();
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(%err, "failed to run gc-compaction subcompaction job");
|
||||
self.clear_running_job();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
// We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
|
||||
// again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
|
||||
@@ -553,10 +591,7 @@ impl GcCompactionQueue {
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
self.clear_running_job();
|
||||
Ok(if yield_for_l0 {
|
||||
tracing::info!("give up gc-compaction: yield for L0 compaction");
|
||||
CompactionOutcome::YieldForL0
|
||||
@@ -1001,9 +1036,9 @@ impl Timeline {
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
@@ -1209,6 +1244,10 @@ impl Timeline {
|
||||
let mut replace_image_layers = Vec::new();
|
||||
|
||||
for layer in layers_to_rewrite {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
|
||||
@@ -2,10 +2,14 @@ use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::DetachBehavior;
|
||||
use pageserver_api::models::detach_ancestor::AncestorDetached;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
@@ -22,7 +26,10 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Tenant;
|
||||
use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
|
||||
use crate::tenant::storage_layer::layer::local_layer_path;
|
||||
use crate::tenant::storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer};
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc as _, DeltaLayerWriter, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -170,6 +177,92 @@ impl Attempt {
|
||||
}
|
||||
}
|
||||
|
||||
async fn generate_tombstone_image_layer(
|
||||
detached: &Arc<Timeline>,
|
||||
ancestor: &Arc<Timeline>,
|
||||
ancestor_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<ResidentLayer>, Error> {
|
||||
tracing::info!(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
// Directly use `get_vectored_impl` to skip the max_vectored_read_key limit check. Note that the keyspace should
|
||||
// not contain too many keys, otherwise this takes a lot of memory. Currently we limit it to 10k keys in the compute.
|
||||
let key_range = Key::sparse_non_inherited_keyspace();
|
||||
// avoid generating a "future layer" which will then be removed
|
||||
let image_lsn = ancestor_lsn;
|
||||
|
||||
{
|
||||
let layers = detached.layers.read().await;
|
||||
for layer in layers.all_persistent_layers() {
|
||||
if !layer.is_delta
|
||||
&& layer.lsn_range.start == image_lsn
|
||||
&& overlaps_with(&key_range, &layer.key_range)
|
||||
{
|
||||
tracing::warn!(
|
||||
layer=%layer, "image layer at the detach LSN already exists, skipping removing aux files"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let data = ancestor
|
||||
.get_vectored_impl(
|
||||
KeySpace::single(key_range.clone()),
|
||||
image_lsn,
|
||||
&mut reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to retrieve aux keys")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
if !data.is_empty() {
|
||||
// TODO: is it possible that we can have an image at `image_lsn`? Unlikely because image layers are only generated
|
||||
// upon compaction but theoretically possible.
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
detached.conf,
|
||||
detached.timeline_id,
|
||||
detached.tenant_shard_id,
|
||||
&key_range,
|
||||
image_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to create image layer writer")
|
||||
.map_err(Error::Prepare)?;
|
||||
for key in data.keys() {
|
||||
image_layer_writer
|
||||
.put_image(*key, Bytes::new(), ctx)
|
||||
.await
|
||||
.context("failed to write key")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
}
|
||||
let (desc, path) = image_layer_writer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.context("failed to finish image layer writer for removing the metadata keys")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
let generated = Layer::finish_creating(detached.conf, detached, desc, &path)
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
detached
|
||||
.remote_client
|
||||
.upload_layer_file(&generated, &detached.cancel)
|
||||
.await
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
tracing::info!(layer=%generated, "wrote image layer");
|
||||
Ok(Some(generated))
|
||||
} else {
|
||||
tracing::info!("no aux keys found in ancestor");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`Timeline::prepare_to_detach_from_ancestor`]
|
||||
pub(super) async fn prepare(
|
||||
detached: &Arc<Timeline>,
|
||||
@@ -352,10 +445,16 @@ pub(super) async fn prepare(
|
||||
|
||||
// TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
|
||||
let mut new_layers: Vec<Layer> =
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
|
||||
|
||||
if let Some(tombstone_layer) =
|
||||
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, ctx).await?
|
||||
{
|
||||
new_layers.push(tombstone_layer.into());
|
||||
}
|
||||
|
||||
{
|
||||
tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
|
||||
tracing::info!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
|
||||
|
||||
let mut tasks = tokio::task::JoinSet::new();
|
||||
|
||||
|
||||
@@ -32,9 +32,15 @@ impl Client {
|
||||
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
|
||||
anyhow::bail!("import_pgdata_upcall_api is not configured")
|
||||
};
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
Ok(Self {
|
||||
base_url: base_url.to_string(),
|
||||
client: reqwest::Client::new(),
|
||||
client: http_client,
|
||||
cancel,
|
||||
authorization_header: conf
|
||||
.import_pgdata_upcall_api_token
|
||||
|
||||
@@ -25,8 +25,8 @@ impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
AlignedBufferMut {
|
||||
raw: RawAlignedBuffer::with_capacity(capacity),
|
||||
|
||||
@@ -37,8 +37,8 @@ impl<const A: usize> RawAlignedBuffer<ConstAlign<A>> {
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
let align = ConstAlign::<A>;
|
||||
let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout");
|
||||
|
||||
@@ -9,4 +9,4 @@
|
||||
#define BITMAP_SET(bm, bit) (bm)[(bit) >> 3] |= (1 << ((bit) & 7))
|
||||
#define BITMAP_CLR(bm, bit) (bm)[(bit) >> 3] &= ~(1 << ((bit) & 7))
|
||||
|
||||
#endif //NEON_BITMAP_H
|
||||
#endif /* NEON_BITMAP_H */
|
||||
|
||||
@@ -13,9 +13,6 @@
|
||||
* accumulate changes. On subtransaction commit, the top of the stack
|
||||
* is merged with the table below it.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/control_plane_connector.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
@@ -3,9 +3,6 @@
|
||||
* extension_server.c
|
||||
* Request compute_ctl to download extension files.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/extension_server.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
@@ -3,9 +3,6 @@
|
||||
* extension_server.h
|
||||
* Request compute_ctl to download extension files.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/extension_server.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/*
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* file_cache.c
|
||||
*
|
||||
@@ -6,10 +6,6 @@
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* pgxn/neon/file_cache.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
@@ -6,10 +6,6 @@
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/libpqpagestore.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
@@ -34,6 +30,7 @@
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include <dirent.h>
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* neon.c
|
||||
* Utility functions to expose neon specific information to user
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/neon.c
|
||||
* Main entry point into the neon exension
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
@@ -3,15 +3,13 @@
|
||||
* neon.h
|
||||
* Functions used in the initialization of this extension.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/neon.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef NEON_H
|
||||
#define NEON_H
|
||||
#include "access/xlogreader.h"
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
/* GUCs */
|
||||
@@ -58,8 +56,8 @@ extern void SetNeonCurrentClusterSize(uint64 size);
|
||||
extern uint64 GetNeonCurrentClusterSize(void);
|
||||
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
|
||||
|
||||
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
|
||||
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
|
||||
PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
|
||||
extern PGDLLEXPORT void WalProposerSync(int argc, char *argv[]);
|
||||
extern PGDLLEXPORT void WalProposerMain(Datum main_arg);
|
||||
extern PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
|
||||
|
||||
#endif /* NEON_H */
|
||||
|
||||
@@ -12,8 +12,8 @@
|
||||
#include "storage/procnumber.h"
|
||||
#else
|
||||
#include "storage/backendid.h"
|
||||
#include "storage/proc.h"
|
||||
#endif
|
||||
#include "storage/proc.h"
|
||||
|
||||
static const uint64 io_wait_bucket_thresholds[] = {
|
||||
2, 3, 6, 10, /* 0 us - 10 us */
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include "access/xlogreader.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
|
||||
@@ -8,8 +8,8 @@
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef pageserver_h
|
||||
#define pageserver_h
|
||||
#ifndef PAGESTORE_CLIENT_h
|
||||
#define PAGESTORE_CLIENT_h
|
||||
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
@@ -17,11 +17,8 @@
|
||||
#include "access/xlogdefs.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "lib/stringinfo.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "storage/block.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
#define MAX_SHARDS 128
|
||||
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
|
||||
@@ -277,13 +274,8 @@ typedef struct
|
||||
XLogRecPtr effective_request_lsn;
|
||||
} neon_request_lsns;
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer);
|
||||
#else
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer);
|
||||
#endif
|
||||
extern int64 neon_dbsize(Oid dbNode);
|
||||
|
||||
/* utils for neon relsize cache */
|
||||
@@ -326,4 +318,4 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif /* PAGESTORE_CLIENT_H */
|
||||
|
||||
@@ -37,10 +37,6 @@
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/pagestore_smgr.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
@@ -55,6 +51,7 @@
|
||||
#include "catalog/pg_class.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "executor/instrument.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
@@ -1903,7 +1900,6 @@ neon_wallog_pagev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
log_pages = true;
|
||||
}
|
||||
else if (XLogInsertAllowed() &&
|
||||
!ShutdownRequestPending &&
|
||||
(forknum == FSM_FORKNUM || forknum == VISIBILITYMAP_FORKNUM))
|
||||
{
|
||||
log_pages = true;
|
||||
@@ -3157,13 +3153,8 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
* The offsets in request_lsns, buffers, and mask are linked.
|
||||
*/
|
||||
static void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
|
||||
char **buffers, BlockNumber nblocks, const bits8 *mask)
|
||||
#else
|
||||
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
|
||||
void **buffers, BlockNumber nblocks, const bits8 *mask)
|
||||
#endif
|
||||
{
|
||||
NeonResponse *resp;
|
||||
uint64 ring_index;
|
||||
@@ -3359,13 +3350,8 @@ Retry:
|
||||
* To avoid breaking tests in the runtime please keep function signature in sync.
|
||||
*/
|
||||
void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer)
|
||||
#else
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer)
|
||||
#endif
|
||||
{
|
||||
neon_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL);
|
||||
}
|
||||
|
||||
@@ -6,10 +6,6 @@
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/relsize_cache.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/datetime.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
@@ -50,13 +50,8 @@ PG_FUNCTION_INFO_V1(trigger_segfault);
|
||||
* Linkage to functions in neon module.
|
||||
* The signature here would need to be updated whenever function parameters change in pagestore_smgr.c
|
||||
*/
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer);
|
||||
#else
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer);
|
||||
#endif
|
||||
|
||||
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
|
||||
|
||||
|
||||
18
poetry.lock
generated
18
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -1286,24 +1286,20 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "4.1.0"
|
||||
version = "4.2.0"
|
||||
description = "Pure-Python HTTP/2 protocol implementation"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = []
|
||||
develop = false
|
||||
files = [
|
||||
{file = "h2-4.2.0-py3-none-any.whl", hash = "sha256:479a53ad425bb29af087f3458a61d30780bc818e4ebcf01f0b536ba916462ed0"},
|
||||
{file = "h2-4.2.0.tar.gz", hash = "sha256:c8a52129695e88b1a0578d8d2cc6842bbd79128ac685463b887ee278126ad01f"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
hpack = ">=4.1,<5"
|
||||
hyperframe = ">=6.1,<7"
|
||||
|
||||
[package.source]
|
||||
type = "git"
|
||||
url = "https://github.com/python-hyper/h2"
|
||||
reference = "HEAD"
|
||||
resolved_reference = "0b98b244b5fd1fe96100ac14905417a3b70a4286"
|
||||
|
||||
[[package]]
|
||||
name = "hpack"
|
||||
version = "4.1.0"
|
||||
@@ -3844,4 +3840,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"
|
||||
content-hash = "7ab1e7b975af34b3271b7c6018fa22a261d3f73c7c0a0403b6b2bb86b5fbd36e"
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
pub(crate) const EXT_VERSION: &str = "0.2.0";
|
||||
pub(crate) const EXT_VERSION: &str = "0.3.0";
|
||||
pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -43,7 +43,7 @@ websockets = "^12.0"
|
||||
clickhouse-connect = "^0.7.16"
|
||||
kafka-python = "^2.0.2"
|
||||
jwcrypto = "^1.5.6"
|
||||
h2 = {git = "https://github.com/python-hyper/h2"}
|
||||
h2 = "^4.2.0"
|
||||
types-jwcrypto = "^1.5.0.20240925"
|
||||
pyyaml = "^6.0.2"
|
||||
types-pyyaml = "^6.0.12.20240917"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.85.0"
|
||||
channel = "1.86.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -115,13 +115,17 @@ impl Client {
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
@@ -197,6 +201,16 @@ impl Client {
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
self.request_maybe_body(method, uri, Some(body)).await
|
||||
}
|
||||
|
||||
/// Send the request and check that the status code is good, with an optional body.
|
||||
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
@@ -208,12 +222,15 @@ impl Client {
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
|
||||
}
|
||||
req.json(&body).send().await.map_err(Error::ReceiveBody)
|
||||
if let Some(body) = body {
|
||||
req = req.json(&body);
|
||||
}
|
||||
req.send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,13 @@ struct Args {
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
pub ssl_ca_file: Option<Utf8PathBuf>,
|
||||
/// Flag to use https for requests to peer's safekeeper API.
|
||||
#[arg(long)]
|
||||
pub use_https_safekeeper_api: bool,
|
||||
/// Path to the JWT auth token used to authenticate with other safekeepers.
|
||||
#[arg(long)]
|
||||
auth_token_path: Option<Utf8PathBuf>,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -338,14 +344,24 @@ async fn main() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
// Load JWT auth token to connect to other safekeepers for pull_timeline.
|
||||
// First check if the env var is present, then check the arg with the path.
|
||||
// We want to deprecate and remove the env var method in the future.
|
||||
let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
|
||||
Ok(v) => {
|
||||
info!("loaded JWT token for authentication with safekeepers");
|
||||
Some(SecretString::from(v))
|
||||
}
|
||||
Err(VarError::NotPresent) => {
|
||||
info!("no JWT token for authentication with safekeepers detected");
|
||||
None
|
||||
if let Some(auth_token_path) = args.auth_token_path.as_ref() {
|
||||
info!(
|
||||
"loading JWT token for authentication with safekeepers from {auth_token_path}"
|
||||
);
|
||||
let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
|
||||
Some(SecretString::from(auth_token.trim().to_owned()))
|
||||
} else {
|
||||
info!("no JWT token for authentication with safekeepers detected");
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("JWT token for authentication with safekeepers is not unicode");
|
||||
@@ -399,6 +415,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
ssl_cert_file: args.ssl_cert_file,
|
||||
ssl_cert_reload_period: args.ssl_cert_reload_period,
|
||||
ssl_ca_certs,
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
});
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
|
||||
@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
|
||||
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
|
||||
TimelineTermBumpRequest,
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
|
||||
TimelineStatus, TimelineTermBumpRequest,
|
||||
};
|
||||
use safekeeper_api::{ServerInfo, membership, models};
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
|
||||
@@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.delete_all_for_tenant(&tenant_id, action)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>(),
|
||||
)
|
||||
let response_body: TenantDeleteResult = delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>();
|
||||
json_response(StatusCode::OK, response_body)
|
||||
}
|
||||
|
||||
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -538,6 +536,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
https_connstr: sk_info.https_connstr,
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
|
||||
@@ -121,6 +121,7 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -170,6 +171,7 @@ impl SafeKeeperConf {
|
||||
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,10 +94,10 @@ impl WalReceivers {
|
||||
|
||||
/// Get reference to locked slot contents. Slot must exist (registered
|
||||
/// earlier).
|
||||
fn get_slot<'a>(
|
||||
self: &'a Arc<WalReceivers>,
|
||||
fn get_slot(
|
||||
self: &Arc<WalReceivers>,
|
||||
id: WalReceiverId,
|
||||
) -> MappedMutexGuard<'a, WalReceiverState> {
|
||||
) -> MappedMutexGuard<'_, WalReceiverState> {
|
||||
MutexGuard::map(self.mutex.lock(), |locked| {
|
||||
locked.slots[id]
|
||||
.as_mut()
|
||||
|
||||
@@ -176,6 +176,7 @@ pub struct Donor {
|
||||
pub flush_lsn: Lsn,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
pub https_connstr: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&PeerInfo> for Donor {
|
||||
@@ -186,6 +187,7 @@ impl From<&PeerInfo> for Donor {
|
||||
flush_lsn: p.flush_lsn,
|
||||
pg_connstr: p.pg_connstr.clone(),
|
||||
http_connstr: p.http_connstr.clone(),
|
||||
https_connstr: p.https_connstr.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -236,11 +238,33 @@ async fn recover(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// Learn donor term switch history to figure out starting point.
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let mut client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
.context("Failed to build http client for recover")?;
|
||||
|
||||
let url = if conf.use_https_safekeeper_api {
|
||||
if let Some(https_connstr) = donor.https_connstr.as_ref() {
|
||||
format!("https://{https_connstr}")
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"cannot recover from donor {}: \
|
||||
https is enabled, but https_connstr is not specified",
|
||||
donor.sk_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
format!("http://{}", donor.http_connstr)
|
||||
};
|
||||
|
||||
let timeline_info: TimelineStatus = client
|
||||
.get(format!(
|
||||
"http://{}/v1/tenant/{}/timeline/{}",
|
||||
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
url, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
|
||||
@@ -50,6 +50,7 @@ fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> Peer
|
||||
local_start_lsn: Lsn(sk_info.local_start_lsn),
|
||||
pg_connstr: sk_info.safekeeper_connstr.clone(),
|
||||
http_connstr: sk_info.http_connstr.clone(),
|
||||
https_connstr: sk_info.https_connstr.clone(),
|
||||
ts,
|
||||
}
|
||||
}
|
||||
@@ -363,6 +364,7 @@ impl SharedState {
|
||||
.to_owned()
|
||||
.unwrap_or(conf.listen_pg_addr.clone()),
|
||||
http_connstr: conf.listen_http_addr.to_owned(),
|
||||
https_connstr: conf.listen_https_addr.to_owned(),
|
||||
backup_lsn: self.sk.state().inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state().local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
@@ -699,7 +701,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
|
||||
pub async fn write_shared_state(self: &Arc<Self>) -> WriteGuardSharedState<'_> {
|
||||
WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
|
||||
}
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ fn test_many_tx() -> anyhow::Result<()> {
|
||||
}
|
||||
None
|
||||
})
|
||||
.last()
|
||||
.next_back()
|
||||
.unwrap();
|
||||
|
||||
let initdb_lsn = 21623024;
|
||||
|
||||
@@ -184,6 +184,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
ssl_cert_file: Utf8PathBuf::from(""),
|
||||
ssl_cert_reload_period: Duration::ZERO,
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -141,6 +141,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
|
||||
https_connstr: Some("zenith-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
|
||||
@@ -45,8 +45,10 @@ message SafekeeperTimelineInfo {
|
||||
uint64 standby_horizon = 14;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string
|
||||
// HTTP endpoint connection string.
|
||||
string http_connstr = 13;
|
||||
// HTTPS endpoint connection string.
|
||||
optional string https_connstr = 15;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
@@ -764,6 +764,7 @@ mod tests {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
|
||||
@@ -10,13 +10,11 @@ pub struct Client {
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
pub fn new(http_client: reqwest::Client, base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
Self {
|
||||
base_url,
|
||||
jwt_token,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client"),
|
||||
client: http_client,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::error::Error as _;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
@@ -364,25 +365,28 @@ pub(crate) struct ShardUpdate<'a> {
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
pub(super) fn new(config: Config) -> Self {
|
||||
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
|
||||
let authorization_header = config
|
||||
.control_plane_jwt_token
|
||||
.clone()
|
||||
.map(|jwt| format!("Bearer {}", jwt));
|
||||
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(NOTIFY_REQUEST_TIMEOUT)
|
||||
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
|
||||
for cert in &config.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
.expect("Failed to construct HTTP client");
|
||||
.context("Failed to build http client for compute hook")?;
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
state: Default::default(),
|
||||
config,
|
||||
authorization_header,
|
||||
neon_local_lock: Default::default(),
|
||||
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
|
||||
client,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// For test environments: use neon_local's LocalEnv to update compute
|
||||
|
||||
@@ -12,6 +12,7 @@ use safekeeper_api::models::SafekeeperUtilization;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use thiserror::Error;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::id::NodeId;
|
||||
use utils::logging::SecretString;
|
||||
|
||||
@@ -227,6 +228,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
|
||||
Some((*node_id, status))
|
||||
}
|
||||
.instrument(tracing::info_span!("heartbeat_ps", %node_id))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -253,7 +255,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
PageserverState::WarmingUp { .. } => {
|
||||
warming_up += 1;
|
||||
}
|
||||
PageserverState::Offline { .. } => offline += 1,
|
||||
PageserverState::Offline => offline += 1,
|
||||
PageserverState::Available { .. } => {}
|
||||
}
|
||||
}
|
||||
@@ -369,6 +371,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
|
||||
Some((*node_id, status))
|
||||
}
|
||||
.instrument(tracing::info_span!("heartbeat_sk", %node_id))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -391,7 +394,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
let mut offline = 0;
|
||||
for state in new_state.values() {
|
||||
match state {
|
||||
SafekeeperState::Offline { .. } => offline += 1,
|
||||
SafekeeperState::Offline => offline += 1,
|
||||
SafekeeperState::Available { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -899,7 +899,7 @@ async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiErr
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
let node_status = state.service.get_node(node_id).await?;
|
||||
let node_status = state.service.get_node(node_id).await?.describe();
|
||||
|
||||
json_response(StatusCode::OK, node_status)
|
||||
}
|
||||
@@ -1733,9 +1733,9 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
|
||||
};
|
||||
|
||||
if *self_addr == leader_addr {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Leader is stepped down instance"
|
||||
))));
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
|
||||
"Leader is stepped down instance".into(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1744,19 +1744,17 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
|
||||
// Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
|
||||
// include some leeway to get the timeout for proxied requests.
|
||||
const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(PROXIED_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
let client = match client {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to build leader client for forwarding while in stepped down state: {err}"
|
||||
))));
|
||||
}
|
||||
};
|
||||
|
||||
let request: reqwest::Request = match convert_request(req, &client, leader.address).await {
|
||||
let client = state.service.get_http_client().clone();
|
||||
|
||||
let request: reqwest::Request = match convert_request(
|
||||
req,
|
||||
&client,
|
||||
leader.address,
|
||||
PROXIED_REQUEST_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -1814,6 +1812,7 @@ async fn convert_request(
|
||||
req: hyper::Request<Body>,
|
||||
client: &reqwest::Client,
|
||||
to_address: String,
|
||||
timeout: Duration,
|
||||
) -> Result<reqwest::Request, ApiError> {
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -1868,6 +1867,7 @@ async fn convert_request(
|
||||
.request(method, uri)
|
||||
.headers(headers)
|
||||
.body(body)
|
||||
.timeout(timeout)
|
||||
.build()
|
||||
.map_err(|err| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
|
||||
|
||||
@@ -110,7 +110,20 @@ impl Leadership {
|
||||
) -> Option<GlobalObservedState> {
|
||||
tracing::info!("Sending step down request to {leader:?}");
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &self.config.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let http_client = match http_client.build() {
|
||||
Ok(http_client) => http_client,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to build client for leader step-down request: {err}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let client = PeerClient::new(
|
||||
http_client,
|
||||
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
|
||||
self.config.peer_jwt_token.clone(),
|
||||
);
|
||||
|
||||
@@ -283,10 +283,8 @@ impl Secrets {
|
||||
fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
|
||||
if let Some(v) = cli {
|
||||
Some(v.clone())
|
||||
} else if let Ok(v) = std::env::var(env_name) {
|
||||
Some(v)
|
||||
} else {
|
||||
None
|
||||
std::env::var(env_name).ok()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,11 +59,11 @@ impl ResponseErrorMessageExt for reqwest::Response {
|
||||
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
impl PeerClient {
|
||||
pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
|
||||
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
|
||||
Self {
|
||||
uri,
|
||||
jwt,
|
||||
client: reqwest::Client::new(),
|
||||
client: http_client,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1524,25 +1524,14 @@ impl Persistence {
|
||||
/// Load pending operations from db.
|
||||
pub(crate) async fn list_pending_ops(
|
||||
&self,
|
||||
filter_for_sk: Option<NodeId>,
|
||||
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
|
||||
use crate::schema::safekeeper_timeline_pending_ops::dsl;
|
||||
|
||||
const FILTER_VAL_1: i64 = 1;
|
||||
const FILTER_VAL_2: i64 = 2;
|
||||
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
|
||||
let timeline_from_db = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<TimelinePendingOpPersistence> =
|
||||
dsl::safekeeper_timeline_pending_ops
|
||||
.filter(
|
||||
dsl::sk_id
|
||||
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
|
||||
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
|
||||
)
|
||||
.load(conn)
|
||||
.await?;
|
||||
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -686,6 +686,8 @@ impl Reconciler {
|
||||
.await?,
|
||||
);
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-post-generation-inc");
|
||||
|
||||
let dest_conf = build_location_config(
|
||||
&self.shard,
|
||||
&self.config,
|
||||
@@ -760,7 +762,9 @@ impl Reconciler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
|
||||
/// Returns true if the observed state of the attached location was refreshed
|
||||
/// and false otherwise.
|
||||
async fn maybe_refresh_observed(&mut self) -> Result<bool, ReconcileError> {
|
||||
// If the attached node has uncertain state, read it from the pageserver before proceeding: this
|
||||
// is important to avoid spurious generation increments.
|
||||
//
|
||||
@@ -770,7 +774,7 @@ impl Reconciler {
|
||||
|
||||
let Some(attached_node) = self.intent.attached.as_ref() else {
|
||||
// Nothing to do
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if matches!(
|
||||
@@ -815,7 +819,7 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Reconciling a tenant makes API calls to pageservers until the observed state
|
||||
@@ -831,7 +835,7 @@ impl Reconciler {
|
||||
/// state where it still requires later reconciliation.
|
||||
pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
|
||||
// Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
|
||||
self.maybe_refresh_observed().await?;
|
||||
let refreshed = self.maybe_refresh_observed().await?;
|
||||
|
||||
// Special case: live migration
|
||||
self.maybe_live_migrate().await?;
|
||||
@@ -855,8 +859,14 @@ impl Reconciler {
|
||||
);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
|
||||
if refreshed {
|
||||
tracing::info!(
|
||||
node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute.");
|
||||
self.compute_notify().await?;
|
||||
} else {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.");
|
||||
}
|
||||
}
|
||||
observed => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user