mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-02 10:10:37 +00:00
Compare commits
38 Commits
conrad/dyn
...
cloneable/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
357795e0c5 | ||
|
|
f105ddb778 | ||
|
|
8e1b5a9727 | ||
|
|
1ef4258f29 | ||
|
|
65e2aae6e4 | ||
|
|
edc874e1b3 | ||
|
|
181af302b5 | ||
|
|
497116b76d | ||
|
|
a917952b30 | ||
|
|
e581b670f4 | ||
|
|
8ed79ed773 | ||
|
|
381f42519e | ||
|
|
375df517a0 | ||
|
|
9db63fea7a | ||
|
|
bfc767d60d | ||
|
|
109c54a300 | ||
|
|
74920d8cd8 | ||
|
|
131b32ef48 | ||
|
|
581bb5d7d5 | ||
|
|
3c78133477 | ||
|
|
46e046e779 | ||
|
|
d8cee52637 | ||
|
|
2e11d129d0 | ||
|
|
43a7423f72 | ||
|
|
374736a4de | ||
|
|
5e507776bc | ||
|
|
4e8e0951be | ||
|
|
64a8d0c2e6 | ||
|
|
7602e6ffc0 | ||
|
|
17193d6a33 | ||
|
|
03ae57236f | ||
|
|
e3d27b2f68 | ||
|
|
dd1299f337 | ||
|
|
cb19e4e05d | ||
|
|
9df230c837 | ||
|
|
3c2bc5baba | ||
|
|
6667810800 | ||
|
|
47f5bcf2bc |
12
.github/scripts/generate_image_maps.py
vendored
12
.github/scripts/generate_image_maps.py
vendored
@@ -39,12 +39,18 @@ registries = {
|
||||
],
|
||||
}
|
||||
|
||||
release_branches = ["release", "release-proxy", "release-compute"]
|
||||
|
||||
outputs: dict[str, dict[str, list[str]]] = {}
|
||||
|
||||
target_tags = [target_tag, "latest"] if branch == "main" else [target_tag]
|
||||
target_stages = (
|
||||
["dev", "prod"] if branch in ["release", "release-proxy", "release-compute"] else ["dev"]
|
||||
target_tags = (
|
||||
[target_tag, "latest"]
|
||||
if branch == "main"
|
||||
else [target_tag, "released"]
|
||||
if branch in release_branches
|
||||
else [target_tag]
|
||||
)
|
||||
target_stages = ["dev", "prod"] if branch in release_branches else ["dev"]
|
||||
|
||||
for component_name, component_images in components.items():
|
||||
for stage in target_stages:
|
||||
|
||||
31
.github/scripts/push_with_image_map.py
vendored
31
.github/scripts/push_with_image_map.py
vendored
@@ -11,12 +11,27 @@ try:
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError("Failed to parse IMAGE_MAP as JSON") from e
|
||||
|
||||
for source, targets in parsed_image_map.items():
|
||||
for target in targets:
|
||||
cmd = ["docker", "buildx", "imagetools", "create", "-t", target, source]
|
||||
print(f"Running: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
failures = []
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"Error: {result.stdout}")
|
||||
raise RuntimeError(f"Command failed: {' '.join(cmd)}")
|
||||
pending = [(source, target) for source, targets in parsed_image_map.items() for target in targets]
|
||||
|
||||
while len(pending) > 0:
|
||||
if len(failures) > 10:
|
||||
print("Error: more than 10 failures!")
|
||||
for failure in failures:
|
||||
print(f'"{failure[0]}" failed with the following output:')
|
||||
print(failure[1])
|
||||
raise RuntimeError("Retry limit reached.")
|
||||
|
||||
source, target = pending.pop(0)
|
||||
cmd = ["docker", "buildx", "imagetools", "create", "-t", target, source]
|
||||
print(f"Running: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
if result.returncode != 0:
|
||||
failures.append((" ".join(cmd), result.stdout))
|
||||
pending.append((source, target))
|
||||
|
||||
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
|
||||
with open(github_output, "a") as f:
|
||||
f.write("slack_notify=true\n")
|
||||
|
||||
@@ -104,6 +104,18 @@ jobs:
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
- name: Copy docker images to target registries
|
||||
id: push
|
||||
run: python3 .github/scripts/push_with_image_map.py
|
||||
env:
|
||||
IMAGE_MAP: ${{ inputs.image-map }}
|
||||
|
||||
- name: Notify Slack if container image pushing fails
|
||||
if: steps.push.outputs.slack_notify == 'true' || failure()
|
||||
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
|
||||
with:
|
||||
method: chat.postMessage
|
||||
token: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
payload: |
|
||||
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
|
||||
text: |
|
||||
Pushing container images failed in <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
|
||||
27
.github/workflows/build_and_test.yml
vendored
27
.github/workflows/build_and_test.yml
vendored
@@ -89,8 +89,8 @@ jobs:
|
||||
|
||||
check-codestyle-python:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
# No need to run on `main` because we this in the merge queue
|
||||
if: ${{ needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/_check-codestyle-python.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -98,7 +98,8 @@ jobs:
|
||||
|
||||
check-codestyle-jsonnet:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
if: ${{ contains(fromJSON('["pr", "push-main"]'), needs.meta.outputs.run-kind) }}
|
||||
# We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
@@ -181,8 +182,8 @@ jobs:
|
||||
|
||||
check-codestyle-rust:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
# No need to run on `main` because we this in the merge queue
|
||||
if: ${{ needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/_check-codestyle-rust.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -191,7 +192,8 @@ jobs:
|
||||
|
||||
check-dependencies-rust:
|
||||
needs: [ meta, files-changed, build-build-tools-image ]
|
||||
if: ${{ needs.files-changed.outputs.check-rust-dependencies == 'true' && needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ needs.files-changed.outputs.check-rust-dependencies == 'true' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/cargo-deny.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -199,7 +201,8 @@ jobs:
|
||||
|
||||
build-and-test-locally:
|
||||
needs: [ meta, build-build-tools-image ]
|
||||
if: ${{ contains(fromJSON('["pr", "push-main"]'), needs.meta.outputs.run-kind) }}
|
||||
# We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -977,7 +980,7 @@ jobs:
|
||||
TEST_EXTENSIONS_TAG: >-
|
||||
${{
|
||||
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
|
||||
&& 'latest'
|
||||
&& needs.meta.outputs.previous-compute-release
|
||||
|| needs.meta.outputs.build-tag
|
||||
}}
|
||||
TEST_VERSION_ONLY: ${{ matrix.pg_version }}
|
||||
@@ -1565,10 +1568,10 @@ jobs:
|
||||
if: |
|
||||
contains(needs.*.result, 'failure')
|
||||
|| contains(needs.*.result, 'cancelled')
|
||||
|| (needs.check-dependencies-rust.result == 'skipped' && needs.files-changed.outputs.check-rust-dependencies == 'true' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.build-and-test-locally.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-codestyle-python.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-codestyle-rust.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-dependencies-rust.result == 'skipped' && needs.files-changed.outputs.check-rust-dependencies == 'true' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.build-and-test-locally.result == 'skipped' && contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.check-codestyle-python.result == 'skipped' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.check-codestyle-rust.result == 'skipped' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| needs.files-changed.result == 'skipped'
|
||||
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|
||||
1092
Cargo.lock
generated
1092
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
11
Cargo.toml
11
Cargo.toml
@@ -40,6 +40,8 @@ members = [
|
||||
"libs/proxy/postgres-protocol2",
|
||||
"libs/proxy/postgres-types2",
|
||||
"libs/proxy/tokio-postgres2",
|
||||
"lambda/aztraffic",
|
||||
"lambda/pod_info_dumper",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -342,3 +344,12 @@ inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
lto = true
|
||||
|
||||
[profile.release-lambda-function]
|
||||
inherits = "release"
|
||||
lto = true
|
||||
opt-level = "z"
|
||||
codegen-units = 1
|
||||
panic = "abort"
|
||||
debug = false
|
||||
strip = true
|
||||
|
||||
@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.85.0
|
||||
ENV RUSTC_VERSION=1.86.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
|
||||
@@ -369,7 +369,7 @@ FROM build-deps AS plv8-src
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src
|
||||
|
||||
COPY compute/patches/plv8-3.1.10.patch .
|
||||
COPY compute/patches/plv8* .
|
||||
|
||||
# plv8 3.2.3 supports v17
|
||||
# last release v3.2.3 - Sep 7, 2024
|
||||
@@ -393,7 +393,7 @@ RUN case "${PG_VERSION:?}" in \
|
||||
git clone --recurse-submodules --depth 1 --branch ${PLV8_TAG} https://github.com/plv8/plv8.git plv8-src && \
|
||||
tar -czf plv8.tar.gz --exclude .git plv8-src && \
|
||||
cd plv8-src && \
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8-3.1.10.patch; fi
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8_v3.1.10.patch; else patch -p1 < /ext-src/plv8_v3.2.3.patch; fi
|
||||
|
||||
# Step 1: Build the vendored V8 engine. It doesn't depend on PostgreSQL, so use
|
||||
# 'build-deps' as the base. This enables caching and avoids unnecessary rebuilds.
|
||||
@@ -1055,34 +1055,6 @@ RUN if [ -d pg_embedding-src ]; then \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_anon-build"
|
||||
# compile anon extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_anon-src
|
||||
ARG PG_VERSION
|
||||
|
||||
# This is an experimental extension, never got to real production.
|
||||
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION:?}" in "v17") \
|
||||
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
|
||||
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pg_anon-build
|
||||
COPY --from=pg_anon-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src
|
||||
RUN if [ -d pg_anon-src ]; then \
|
||||
cd pg_anon-src && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control; \
|
||||
fi
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg build with nonroot user and cargo installed"
|
||||
@@ -1366,8 +1338,8 @@ ARG PG_VERSION
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
WORKDIR /ext-src
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.2.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "5ace028e591f2e000ca10afa5b1ca62203ebff014c2907c0ec3b29c36f28a1bb pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
|
||||
@@ -1677,7 +1649,6 @@ COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_embedding-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
import 'sql_exporter/lfc_hits.libsonnet',
|
||||
import 'sql_exporter/lfc_misses.libsonnet',
|
||||
import 'sql_exporter/lfc_used.libsonnet',
|
||||
import 'sql_exporter/lfc_used_pages.libsonnet',
|
||||
import 'sql_exporter/lfc_writes.libsonnet',
|
||||
import 'sql_exporter/logical_slot_restart_lsn.libsonnet',
|
||||
import 'sql_exporter/max_cluster_size.libsonnet',
|
||||
|
||||
10
compute/etc/sql_exporter/lfc_used_pages.libsonnet
Normal file
10
compute/etc/sql_exporter/lfc_used_pages.libsonnet
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
metric_name: 'lfc_used_pages',
|
||||
type: 'gauge',
|
||||
help: 'LFC pages used',
|
||||
key_labels: null,
|
||||
values: [
|
||||
'lfc_used_pages',
|
||||
],
|
||||
query: importstr 'sql_exporter/lfc_used_pages.sql',
|
||||
}
|
||||
1
compute/etc/sql_exporter/lfc_used_pages.sql
Normal file
1
compute/etc/sql_exporter/lfc_used_pages.sql
Normal file
@@ -0,0 +1 @@
|
||||
SELECT lfc_value AS lfc_used_pages FROM neon.neon_lfc_stats WHERE lfc_key = 'file_cache_used_pages';
|
||||
@@ -2,23 +2,6 @@ diff --git a/expected/ut-A.out b/expected/ut-A.out
|
||||
index da723b8..5328114 100644
|
||||
--- a/expected/ut-A.out
|
||||
+++ b/expected/ut-A.out
|
||||
@@ -9,13 +9,16 @@ SET search_path TO public;
|
||||
----
|
||||
-- No.A-1-1-3
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
-- No.A-1-2-3
|
||||
DROP EXTENSION pg_hint_plan;
|
||||
-- No.A-1-1-4
|
||||
CREATE SCHEMA other_schema;
|
||||
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
DROP SCHEMA other_schema;
|
||||
----
|
||||
---- No. A-5-1 comment pattern
|
||||
@@ -3175,6 +3178,7 @@ SELECT s.query, s.calls
|
||||
FROM public.pg_stat_statements s
|
||||
JOIN pg_catalog.pg_database d
|
||||
@@ -27,18 +10,6 @@ index da723b8..5328114 100644
|
||||
ORDER BY 1;
|
||||
query | calls
|
||||
--------------------------------------+-------
|
||||
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
|
||||
index d372459..6282afe 100644
|
||||
--- a/expected/ut-fdw.out
|
||||
+++ b/expected/ut-fdw.out
|
||||
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
|
||||
SET client_min_messages TO LOG;
|
||||
SET pg_hint_plan.enable_hint TO on;
|
||||
CREATE EXTENSION file_fdw;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
|
||||
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
|
||||
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
|
||||
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
|
||||
diff --git a/sql/ut-A.sql b/sql/ut-A.sql
|
||||
index 7c7d58a..4fd1a07 100644
|
||||
--- a/sql/ut-A.sql
|
||||
|
||||
@@ -1,24 +1,3 @@
|
||||
diff --git a/expected/ut-A.out b/expected/ut-A.out
|
||||
index e7d68a1..65a056c 100644
|
||||
--- a/expected/ut-A.out
|
||||
+++ b/expected/ut-A.out
|
||||
@@ -9,13 +9,16 @@ SET search_path TO public;
|
||||
----
|
||||
-- No.A-1-1-3
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
-- No.A-1-2-3
|
||||
DROP EXTENSION pg_hint_plan;
|
||||
-- No.A-1-1-4
|
||||
CREATE SCHEMA other_schema;
|
||||
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
|
||||
CREATE EXTENSION pg_hint_plan;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
|
||||
DROP SCHEMA other_schema;
|
||||
----
|
||||
---- No. A-5-1 comment pattern
|
||||
diff --git a/expected/ut-J.out b/expected/ut-J.out
|
||||
index 2fa3c70..314e929 100644
|
||||
--- a/expected/ut-J.out
|
||||
@@ -160,15 +139,3 @@ index a09bd34..0ad227c 100644
|
||||
error hint:
|
||||
|
||||
explain_filter
|
||||
diff --git a/expected/ut-fdw.out b/expected/ut-fdw.out
|
||||
index 017fa4b..98d989b 100644
|
||||
--- a/expected/ut-fdw.out
|
||||
+++ b/expected/ut-fdw.out
|
||||
@@ -7,6 +7,7 @@ SET pg_hint_plan.debug_print TO on;
|
||||
SET client_min_messages TO LOG;
|
||||
SET pg_hint_plan.enable_hint TO on;
|
||||
CREATE EXTENSION file_fdw;
|
||||
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
|
||||
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
|
||||
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
|
||||
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
commit 46b38d3e46f9cd6c70d9b189dd6ff4abaa17cf5e
|
||||
Author: Alexander Bayandin <alexander@neon.tech>
|
||||
Date: Sat Nov 30 18:29:32 2024 +0000
|
||||
|
||||
Fix v8 9.7.37 compilation on Debian 12
|
||||
|
||||
diff --git a/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch b/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch
|
||||
new file mode 100644
|
||||
index 0000000..f0a5dc7
|
||||
index 0000000..fae1cb3
|
||||
--- /dev/null
|
||||
+++ b/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch
|
||||
@@ -0,0 +1,30 @@
|
||||
@@ -35,8 +29,21 @@ index 0000000..f0a5dc7
|
||||
+@@ -5,6 +5,7 @@
|
||||
+ #ifndef V8_HEAP_CPPGC_PREFINALIZER_HANDLER_H_
|
||||
+ #define V8_HEAP_CPPGC_PREFINALIZER_HANDLER_H_
|
||||
+
|
||||
+
|
||||
++#include <utility>
|
||||
+ #include <vector>
|
||||
+
|
||||
+
|
||||
+ #include "include/cppgc/prefinalizer.h"
|
||||
diff --git a/plv8.cc b/plv8.cc
|
||||
index c1ce883..6e47e94 100644
|
||||
--- a/plv8.cc
|
||||
+++ b/plv8.cc
|
||||
@@ -379,7 +379,7 @@ _PG_init(void)
|
||||
NULL,
|
||||
&plv8_v8_flags,
|
||||
NULL,
|
||||
- PGC_USERSET, 0,
|
||||
+ PGC_SUSET, 0,
|
||||
#if PG_VERSION_NUM >= 90100
|
||||
NULL,
|
||||
#endif
|
||||
13
compute/patches/plv8_v3.2.3.patch
Normal file
13
compute/patches/plv8_v3.2.3.patch
Normal file
@@ -0,0 +1,13 @@
|
||||
diff --git a/plv8.cc b/plv8.cc
|
||||
index edfa2aa..623e7f2 100644
|
||||
--- a/plv8.cc
|
||||
+++ b/plv8.cc
|
||||
@@ -385,7 +385,7 @@ _PG_init(void)
|
||||
NULL,
|
||||
&plv8_v8_flags,
|
||||
NULL,
|
||||
- PGC_USERSET, 0,
|
||||
+ PGC_SUSET, 0,
|
||||
#if PG_VERSION_NUM >= 90100
|
||||
NULL,
|
||||
#endif
|
||||
@@ -188,7 +188,7 @@ impl ComputeState {
|
||||
|
||||
COMPUTE_CTL_UP.reset();
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()])
|
||||
.with_label_values(&[&BUILD_TAG, status.to_string().as_str()])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
@@ -360,6 +360,14 @@ impl ComputeNode {
|
||||
this.prewarm_postgres()?;
|
||||
}
|
||||
|
||||
// Set the up metric with Empty status before starting the HTTP server.
|
||||
// That way on the first metric scrape, an external observer will see us
|
||||
// as 'up' and 'empty' (unless the compute was started with a spec or
|
||||
// already configured by control plane).
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
|
||||
.set(1);
|
||||
|
||||
// Launch the external HTTP server first, so that we can serve control plane
|
||||
// requests while configuration is still in progress.
|
||||
crate::http::server::Server::External {
|
||||
@@ -369,19 +377,13 @@ impl ComputeNode {
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// The internal HTTP server is needed for a further activation by control plane
|
||||
// if compute was started for a pool, so we have to start server before hanging
|
||||
// waiting for a spec.
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
crate::http::server::Server::Internal {
|
||||
port: this.params.internal_http_port,
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// HTTP server is running, so we can officially declare compute_ctl as 'up'
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
|
||||
.set(1);
|
||||
|
||||
// If we got a spec from the CLI already, use that. Otherwise wait for the
|
||||
// control plane to pass it to us with a /configure HTTP request
|
||||
let pspec = if let Some(cli_spec) = cli_spec {
|
||||
|
||||
@@ -59,9 +59,12 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
Box::pin(async move {
|
||||
let request_id = request.extract_parts::<RequestId>().await.unwrap();
|
||||
|
||||
// TODO: Remove this check after a successful rollout
|
||||
if jwks.keys.is_empty() {
|
||||
warn!(%request_id, "Authorization has not been configured");
|
||||
// TODO: Remove this stanza after teaching neon_local and the
|
||||
// regression tests to use a JWT + JWKS.
|
||||
//
|
||||
// https://github.com/neondatabase/neon/issues/11316
|
||||
if cfg!(feature = "testing") {
|
||||
warn!(%request_id, "Skipping compute_ctl authorization check");
|
||||
|
||||
return Ok(request);
|
||||
}
|
||||
@@ -110,8 +113,6 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
impl Authorize {
|
||||
/// Verify the token using the JSON Web Key set and return the token data.
|
||||
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
|
||||
debug_assert!(!jwks.keys.is_empty());
|
||||
|
||||
for jwk in jwks.keys.iter() {
|
||||
let decoding_key = match DecodingKey::from_jwk(jwk) {
|
||||
Ok(key) => key,
|
||||
|
||||
@@ -419,7 +419,7 @@ impl ComputeNode {
|
||||
.iter()
|
||||
.filter_map(|val| val.parse::<usize>().ok())
|
||||
.map(|val| if val > 1 { val - 1 } else { 1 })
|
||||
.last()
|
||||
.next_back()
|
||||
.unwrap_or(3)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -545,6 +545,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_compaction_ratio_percent' as integer")?,
|
||||
sampling_ratio: settings
|
||||
.remove("sampling_ratio")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Falied to parse 'sampling_ratio'")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -385,8 +385,6 @@ where
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let ssl_ca_certs = match &cli.ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
@@ -401,9 +399,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let storcon_client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let mut trimmed = cli.api.to_string();
|
||||
trimmed.pop();
|
||||
let vps_client = mgmt_api::Client::new(http_client, trimmed, cli.jwt.as_deref());
|
||||
let vps_client = mgmt_api::Client::new(http_client.clone(), trimmed, cli.jwt.as_deref());
|
||||
|
||||
match cli.command {
|
||||
Command::NodeRegister {
|
||||
@@ -1056,7 +1056,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
|
||||
let mut stream = futures::stream::iter(moves)
|
||||
.map(|mv| {
|
||||
let client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
let client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
|
||||
async move {
|
||||
client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
|
||||
@@ -21,6 +21,7 @@ in this repository.
|
||||
- [WAL Redo](./pageserver-walredo.md)
|
||||
- [Page cache](./pageserver-pagecache.md)
|
||||
- [Storage](./pageserver-storage.md)
|
||||
- [Compaction](./pageserver-compaction.md)
|
||||
- [Processing a GetPage request](./pageserver-processing-getpage.md)
|
||||
- [Processing WAL](./pageserver-processing-wal.md)
|
||||
|
||||
|
||||
110
docs/pageserver-compaction.md
Normal file
110
docs/pageserver-compaction.md
Normal file
@@ -0,0 +1,110 @@
|
||||
# Pageserver Compaction
|
||||
|
||||
Lifted from <https://www.notion.so/neondatabase/Rough-Notes-on-Compaction-1baf189e004780859e65ef63b85cfa81?pvs=4>.
|
||||
|
||||
Updated 2025-03-26.
|
||||
|
||||
## Pages and WAL
|
||||
|
||||
Postgres stores data in 8 KB pages, identified by a page number.
|
||||
|
||||
The WAL contains a sequence of page writes: either images (complete page contents) or deltas (patches applied to images). Each write is identified by its byte position in the WAL, aka LSN.
|
||||
|
||||
Each page version is thus identified by page@LSN. Postgres may read pages at past LSNs.
|
||||
|
||||
Pageservers ingest WAL by writing WAL records into a key/value store keyed by page@LSN.
|
||||
|
||||
Pageservers materialize pages for Postgres reads by finding the most recent page image and applying all subsequent page deltas, up to the read LSN.
|
||||
|
||||
## Compaction: Why?
|
||||
|
||||
Pageservers store page@LSN keys in a key/value store using a custom variant of an LSM tree. Each timeline on each tenant shard has its own LSM tree.
|
||||
|
||||
When Pageservers write new page@LSN entries, they are appended unordered to an ephemeral layer file. When the ephemeral layer file exceeds `checkpoint_distance` (default 256 MB), the key/value pairs are sorted by key and written out to a layer file (for efficient lookups).
|
||||
|
||||
As WAL writes continue, more layer files accumulate.
|
||||
|
||||
Reads must search through the layer files to find the page’s image and deltas. The more layer files accumulate, the more la yer files reads must search through before they find a page image, aka read amplification.
|
||||
|
||||
Compaction’s job is to:
|
||||
|
||||
- Reduce read amplification by reorganizing and combining layer files.
|
||||
- Remove old garbage from layer files.
|
||||
|
||||
As part of this, it may combine several page deltas into a single page image where possible.
|
||||
|
||||
## Compaction: How?
|
||||
|
||||
Neon uses a non-standard variant of an LSM tree made up of two levels of layer files: L0 and L1.
|
||||
|
||||
Compaction runs in two phases: L0→L1 compaction, and L1 image compaction.
|
||||
|
||||
L0 contains a stack of L0 layers at decreasing LSN ranges. These have been flushed sequentially from ephemeral layers. Each L0 layer covers the entire page space (page 0 to ~infinity) and the LSN range that was ingested into it. L0 layers are therefore particularly bad for read amp, since every read must search all L0 layers below the read LSN. For example:
|
||||
|
||||
```
|
||||
| Page 0-99 @ LSN 0400-04ff |
|
||||
| Page 0-99 @ LSN 0300-03ff |
|
||||
| Page 0-99 @ LSN 0200-02ff |
|
||||
| Page 0-99 @ LSN 0100-01ff |
|
||||
| Page 0-99 @ LSN 0000-00ff |
|
||||
```
|
||||
|
||||
L0→L1 compaction takes the bottom-most chunk of L0 layer files of between `compaction_threshold` (default 10) and `compaction_upper_limit` (default 20) layers. It uses merge-sort to write out sorted L1 delta layers of size `compaction_target_size` (default 128 MB).
|
||||
|
||||
L1 typically consists of a “bed” of image layers with materialized page images at a specific LSN, and then delta layers of various page/LSN ranges above them with page deltas. For example:
|
||||
|
||||
```
|
||||
Delta layers: | 30-84@0310-04ff |
|
||||
Delta layers: | 10-42@0200-02ff | | 65-92@0174-02aa |
|
||||
Image layers: | 0-39@0100 | 40-79@0100 | 80-99@0100 |
|
||||
```
|
||||
|
||||
L1 image compaction scans across the L1 keyspace at some LSN, materializes page images by reading the image and delta layers below the LSN (via vectored reads), and writes out new sorted image layers of roughly size `compaction_target_size` (default 128 MB) at that LSN.
|
||||
|
||||
Layer files below the new image files’ LSN can be garbage collected when they are no longer needed for PITR.
|
||||
|
||||
Even though the old layer files are not immediately garbage collected, the new image layers help with read amp because reads can stop traversing the layer stack as soon as they encounter a page image.
|
||||
|
||||
## Compaction: When?
|
||||
|
||||
Pageservers run a `compaction_loop` background task for each tenant shard. Every `compaction_period` (default 20 seconds) it will wake up and check if any of the shard’s timelines need compaction. Additionally, L0 layer flushes will eagerly wake the compaction loop if the L0 count exceeds `compaction_threshold` (default 10).
|
||||
|
||||
L0 compaction runs if the number of L0 layers exceeds `compaction_threshold` (default 10).
|
||||
|
||||
L1 image compaction runs across sections of the L1 keyspace that have at least `image_creation_threshold` (default 3) delta layers overlapping image layers.
|
||||
|
||||
At most `CONCURRENT_BACKGROUND_TASKS` (default 3 / 4 * CPUs = 6) background tasks can run concurrently on a Pageserver, including compaction. Further compaction tasks must wait.
|
||||
|
||||
Because L0 layers cause the most read amp (they overlap the entire keyspace and only contain page deltas), they are aggressively compacted down:
|
||||
|
||||
- L0 is compacted down across all tenant timelines before L1 compaction is attempted (`compaction_l0_first`).
|
||||
- L0 compaction uses a separate concurrency limit of `CONCURRENT_L0_COMPACTION_TASKS` (default 3 / 4 * CPUs = 6) to avoid waiting for other tasks (`compaction_l0_semaphore`).
|
||||
- If L0 compaction is needed on any tenant timeline, L1 image compaction will yield to start an immediate L0 compaction run (except for compaction run via admin APIs).
|
||||
|
||||
## Backpressure
|
||||
|
||||
With sustained heavy write loads, new L0 layers may be flushed faster than they can be compacted down. This can cause an unbounded buildup of read amplification and compaction debt, which can take hours to resolve even after the writes stop.
|
||||
|
||||
To avoid this and allow compaction to keep up, layer flushes will slow writes down to apply backpressure on the workload:
|
||||
|
||||
- At `l0_flush_delay_threshold` (default 30) L0 layers, layer flushes are delayed by the flush duration, such that they take 2x as long.
|
||||
- At `l0_flush_stall_threshold` (default disabled) L0 layers, layer flushes stall entirely until the L0 count falls back below the threshold. This is currently disabled because we don’t trust L0 compaction to be responsive enough.
|
||||
|
||||
This backpressure is propagated to the compute by waiting for layer flushes when WAL ingestion rolls the ephemeral layer. The compute will significantly slow down WAL writes at:
|
||||
|
||||
- `max_replication_write_lag` (default 500 MB), when Pageserver WAL ingestion lags
|
||||
- `max_replication_flush_lag` (default 10 GB), when Pageserver L0 flushes lag
|
||||
|
||||
Combined, this means that the compute will backpressure when there are 30 L0 layers (30 * 256 MB = 7.7 GB) and the Pageserver WAL ingestion lags the compute by 500 MB, for a total of ~8 GB L0+ephemeral compaction debt on a single shard.
|
||||
|
||||
Since we only delay L0 flushes by 2x when backpressuring, and haven’t enabled stalls, it is still possible for read amp to increase unbounded if compaction is too slow (although we haven’t seen this in practice). But this is considered better than stalling flushes and causing unavailability for as long as it takes L0 compaction to react, since we don’t trust it to be fast enough — at the expense of continually increasing read latency and CPU usage for this tenant. We should either enable stalls when we have enough confidence in L0 compaction, or scale the flush delay by the number of L0 layers to apply increasing backpressure.
|
||||
|
||||
## Circuit Breaker
|
||||
|
||||
Compaction can fail, often repeatedly. This can happen e.g. due to data corruption, faulty hardware, S3 outages, etc.
|
||||
|
||||
If compaction fails, the compaction loop will naïvely try and fail again almost immediately. It may only fail after doing a significant amount of wasted work, while holding onto the background task semaphore.
|
||||
|
||||
To avoid repeatedly doing wasted work and starving out other compaction jobs, each tenant has a compaction circuit breaker. After 5 repeated compaction failures, the circuit breaker trips and disables compaction for the next 24 hours, before resetting the breaker and trying again. This disables compaction across all tenant timelines (faulty or not).
|
||||
|
||||
Disabling compaction for a long time is dangerous, since it can lead to unbounded read amp and compaction debt, and continuous workload backpressure. However, continually failing would not help either. Tripped circuit breakers trigger an alert and must be investigated promptly.
|
||||
22
lambda/aztraffic/Cargo.toml
Normal file
22
lambda/aztraffic/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "aztraffic"
|
||||
version = "0.0.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.97"
|
||||
aws-config = "1.6.1"
|
||||
aws-sdk-athena = "1.68.0"
|
||||
aws-sdk-ec2 = "1.121.0"
|
||||
aws-sdk-eks = "1.82.0"
|
||||
aws-sdk-glue = "1.88.0"
|
||||
aws-sdk-lambda = "1.75.0"
|
||||
aws-sdk-scheduler = "1.64.0"
|
||||
aws-sdk-sfn = "1.68.0"
|
||||
aws-sdk-sts = "1.65.0"
|
||||
clap = { version = "4.5.35", features = ["derive", "env"] }
|
||||
tokio = { version = "1.44.1", features = ["full"] }
|
||||
serde = "1.0.219"
|
||||
serde_json = { version = "1.0.140", features = ["preserve_order"] }
|
||||
794
lambda/aztraffic/src/main.rs
Normal file
794
lambda/aztraffic/src/main.rs
Normal file
@@ -0,0 +1,794 @@
|
||||
use std::fs;
|
||||
|
||||
use aws_config::default_provider::credentials::DefaultCredentialsChain;
|
||||
use aws_sdk_ec2::types::{
|
||||
DestinationFileFormat, DestinationOptionsRequest, FlowLogsResourceType, LogDestinationType,
|
||||
TrafficType,
|
||||
};
|
||||
use aws_sdk_glue::primitives::Blob;
|
||||
use aws_sdk_glue::types::{Column, DatabaseInput, SerDeInfo, StorageDescriptor, TableInput};
|
||||
use aws_sdk_lambda::types::{Environment, FunctionCode, Runtime};
|
||||
use aws_sdk_scheduler::types::{
|
||||
DeadLetterConfig, FlexibleTimeWindow, FlexibleTimeWindowMode, RetryPolicy, Target,
|
||||
};
|
||||
use aws_sdk_sfn::types::{CloudWatchLogsLogGroup, LogDestination, LogLevel, LoggingConfiguration};
|
||||
use clap::Parser;
|
||||
use serde_json::json;
|
||||
|
||||
#[derive(Parser, Clone, Debug)]
|
||||
struct Args {
|
||||
#[arg(long, value_name = "id")]
|
||||
account_id: String,
|
||||
#[arg(long, value_name = "region")]
|
||||
region: String,
|
||||
#[arg(long, value_name = "cluster")]
|
||||
cluster: String,
|
||||
#[arg(long, value_name = "id")]
|
||||
vpc_id: Vec<String>,
|
||||
|
||||
#[arg(long, value_name = "arn")]
|
||||
log_group_arn: String,
|
||||
#[arg(long, value_name = "name")]
|
||||
pod_info_s3_bucket_name: String,
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "path",
|
||||
default_value = "CrossAZTraffic/pod_info_dumper/pod_info.csv"
|
||||
)]
|
||||
pod_info_s3_bucket_key: String,
|
||||
#[arg(long, value_name = "uri")]
|
||||
pod_info_s3_bucket_uri: String,
|
||||
#[arg(long, value_name = "uri")]
|
||||
vpc_flow_logs_s3_bucket_uri: String,
|
||||
#[arg(long, value_name = "uri")]
|
||||
results_s3_bucket_uri: String,
|
||||
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "name",
|
||||
default_value = "./target/lambda/pod_info_dumper/bootstrap.zip"
|
||||
)]
|
||||
lambda_zipfile_path: String,
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "name",
|
||||
default_value = "CrossAZTraffic-podinfo-function"
|
||||
)]
|
||||
lambda_function_name: String,
|
||||
#[arg(long, value_name = "arn")]
|
||||
lambda_role_arn: String,
|
||||
|
||||
#[arg(long, value_name = "name")]
|
||||
glue_database_name: String,
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "name",
|
||||
default_value = "CrossAZTraffic-podinfo-table"
|
||||
)]
|
||||
glue_pod_info_table_name: String,
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "name",
|
||||
default_value = "CrossAZTraffic-vpcflowlogs-table"
|
||||
)]
|
||||
glue_vpc_flow_logs_table_name: String,
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "name",
|
||||
default_value = "CrossAZTraffic-results-table"
|
||||
)]
|
||||
glue_results_table_name: String,
|
||||
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "name",
|
||||
default_value = "CrossAZTraffic-trigger-schedule"
|
||||
)]
|
||||
schedule_name: String,
|
||||
#[arg(long, value_name = "minutes", default_value_t = 60)]
|
||||
schedule_interval_minutes: usize,
|
||||
#[arg(long, value_name = "arn")]
|
||||
schedule_target_state_machine_arn: String,
|
||||
#[arg(long, value_name = "arn")]
|
||||
schedule_target_role_arn: String,
|
||||
#[arg(long, value_name = "arn")]
|
||||
schedule_dead_letter_queue_arn: Option<String>,
|
||||
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "name",
|
||||
default_value = "CrossAZTraffic-combine-query"
|
||||
)]
|
||||
athena_query_name: String,
|
||||
|
||||
#[arg(long, value_name = "uri")]
|
||||
vpcflowlogs_destination_s3_bucket_uri: String,
|
||||
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "name",
|
||||
default_value = "CrossAZTraffic-statemachine"
|
||||
)]
|
||||
statemachine_name: String,
|
||||
#[arg(long, value_name = "arn")]
|
||||
statemachine_role_arn: String,
|
||||
|
||||
#[arg(long, value_name = "uri")]
|
||||
athena_results_s3_bucket_uri: String,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
eprintln!("{args:#?}");
|
||||
|
||||
// TODO: athena results bucket + lifecycle config
|
||||
// TODO: iam role split
|
||||
// TODO: iam policy
|
||||
// TODO: clusterrole + binding
|
||||
// TODO: eks mapping
|
||||
// TODO: log group
|
||||
// TODO: dlq
|
||||
|
||||
let sdk_config = create_sdk_config(&args).await?;
|
||||
|
||||
LambdaFunction {
|
||||
local_zipfile_path: args.lambda_zipfile_path,
|
||||
function_name: args.lambda_function_name.clone(),
|
||||
role_arn: args.lambda_role_arn,
|
||||
account_id: args.account_id,
|
||||
region: args.region,
|
||||
cluster: args.cluster,
|
||||
s3_bucket_name: args.pod_info_s3_bucket_name,
|
||||
s3_bucket_key: args.pod_info_s3_bucket_key,
|
||||
}
|
||||
.create(&sdk_config)
|
||||
.await?;
|
||||
|
||||
GlueDatabase {
|
||||
database_name: args.glue_database_name.clone(),
|
||||
pod_info_table_name: args.glue_pod_info_table_name.clone(),
|
||||
pod_info_s3_bucket_uri: args.pod_info_s3_bucket_uri,
|
||||
vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name.clone(),
|
||||
vpc_flow_logs_s3_bucket_uri: args.vpc_flow_logs_s3_bucket_uri,
|
||||
results_table_name: args.glue_results_table_name.clone(),
|
||||
results_s3_bucket_uri: args.results_s3_bucket_uri,
|
||||
}
|
||||
.create(&sdk_config)
|
||||
.await?;
|
||||
|
||||
let named_query_id = AthenaQuery {
|
||||
query_name: args.athena_query_name,
|
||||
glue_database: args.glue_database_name.clone(),
|
||||
invocation_frequency: args.schedule_interval_minutes,
|
||||
athena_results_table_name: args.glue_results_table_name,
|
||||
vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name,
|
||||
pod_info_table_name: args.glue_pod_info_table_name,
|
||||
}
|
||||
.create(&sdk_config)
|
||||
.await?;
|
||||
|
||||
StateMachine {
|
||||
name: args.statemachine_name,
|
||||
role_arn: args.statemachine_role_arn,
|
||||
named_query_id,
|
||||
glue_database: args.glue_database_name,
|
||||
lambda_function_name: args.lambda_function_name,
|
||||
athena_results_s3_bucket_uri: args.athena_results_s3_bucket_uri,
|
||||
log_group_arn: args.log_group_arn,
|
||||
}
|
||||
.create(&sdk_config)
|
||||
.await?;
|
||||
|
||||
Schedule {
|
||||
name: args.schedule_name,
|
||||
interval_minutes: args.schedule_interval_minutes,
|
||||
dead_letter_queue_arn: args.schedule_dead_letter_queue_arn,
|
||||
target_role_arn: args.schedule_target_role_arn,
|
||||
target_state_machine_arn: args.schedule_target_state_machine_arn,
|
||||
}
|
||||
.create(&sdk_config)
|
||||
.await?;
|
||||
|
||||
let flow_log_ids = VpcFlowLogs {
|
||||
vpc_ids: args.vpc_id,
|
||||
destination_s3_bucket_uri: args.vpcflowlogs_destination_s3_bucket_uri,
|
||||
}
|
||||
.create(&sdk_config)
|
||||
.await?;
|
||||
|
||||
println!("VPC flow log IDs: {:?}", flow_log_ids.as_slice());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_sdk_config(args: &Args) -> anyhow::Result<aws_config::SdkConfig> {
|
||||
let region = aws_config::Region::new(args.region.to_owned());
|
||||
let credentials_provider = DefaultCredentialsChain::builder()
|
||||
.region(region.clone())
|
||||
.build()
|
||||
.await;
|
||||
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
|
||||
.region(region)
|
||||
.credentials_provider(credentials_provider)
|
||||
.load()
|
||||
.await)
|
||||
}
|
||||
|
||||
struct LambdaFunction {
|
||||
local_zipfile_path: String,
|
||||
function_name: String,
|
||||
role_arn: String,
|
||||
account_id: String,
|
||||
region: String,
|
||||
cluster: String,
|
||||
s3_bucket_name: String,
|
||||
s3_bucket_key: String,
|
||||
}
|
||||
|
||||
impl LambdaFunction {
|
||||
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
|
||||
let code = fs::read(&self.local_zipfile_path)?;
|
||||
|
||||
let client = aws_sdk_lambda::Client::new(sdk_config);
|
||||
client
|
||||
.delete_function()
|
||||
.function_name(&self.function_name)
|
||||
.send()
|
||||
.await
|
||||
.ok();
|
||||
|
||||
client
|
||||
.create_function()
|
||||
.function_name(&self.function_name)
|
||||
.runtime(Runtime::Providedal2023)
|
||||
.handler("bootstrap")
|
||||
.role(&self.role_arn)
|
||||
.code(FunctionCode::builder().zip_file(Blob::new(code)).build())
|
||||
.timeout(60)
|
||||
.environment(
|
||||
Environment::builder()
|
||||
.set_variables(Some(
|
||||
[
|
||||
("NEON_ACCOUNT_ID", self.account_id.as_str()),
|
||||
("NEON_REGION", self.region.as_str()),
|
||||
("NEON_CLUSTER", self.cluster.as_str()),
|
||||
("NEON_S3_BUCKET_NAME", self.s3_bucket_name.as_str()),
|
||||
("NEON_S3_BUCKET_KEY", self.s3_bucket_key.as_str()),
|
||||
("AWS_LAMBDA_LOG_FORMAT", "JSON"),
|
||||
("AWS_LAMBDA_LOG_LEVEL", "INFO"),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.into(), v.into()))
|
||||
.collect(),
|
||||
))
|
||||
.build(),
|
||||
)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct VpcFlowLogs {
|
||||
vpc_ids: Vec<String>,
|
||||
destination_s3_bucket_uri: String,
|
||||
}
|
||||
|
||||
impl VpcFlowLogs {
|
||||
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<Vec<String>> {
|
||||
let ec2_client = aws_sdk_ec2::Client::new(sdk_config);
|
||||
|
||||
let flow_logs = ec2_client
|
||||
.create_flow_logs()
|
||||
.resource_type(FlowLogsResourceType::Vpc)
|
||||
.set_resource_ids(Some(self.vpc_ids.clone()))
|
||||
.traffic_type(TrafficType::All)
|
||||
.log_destination_type(LogDestinationType::S3)
|
||||
.log_destination(&self.destination_s3_bucket_uri)
|
||||
.destination_options(
|
||||
DestinationOptionsRequest::builder()
|
||||
.file_format(DestinationFileFormat::Parquet)
|
||||
.hive_compatible_partitions(false)
|
||||
.per_hour_partition(true)
|
||||
.build(),
|
||||
)
|
||||
.log_format("${region} ${az-id} ${vpc-id} ${flow-direction} ${pkt-srcaddr} ${pkt-dstaddr} ${srcport} ${dstport} ${start} ${bytes}")
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if let Some(unsuccessful) = flow_logs
|
||||
.unsuccessful
|
||||
.as_ref()
|
||||
.and_then(|v| if v.is_empty() { None } else { Some(v) })
|
||||
{
|
||||
anyhow::bail!("VPC flow log creation unsuccessful: {unsuccessful:?}");
|
||||
}
|
||||
|
||||
Ok(flow_logs.flow_log_ids().iter().cloned().collect())
|
||||
}
|
||||
}
|
||||
|
||||
struct GlueDatabase {
|
||||
database_name: String,
|
||||
pod_info_table_name: String,
|
||||
pod_info_s3_bucket_uri: String,
|
||||
vpc_flow_logs_table_name: String,
|
||||
vpc_flow_logs_s3_bucket_uri: String,
|
||||
results_table_name: String,
|
||||
results_s3_bucket_uri: String,
|
||||
}
|
||||
|
||||
impl GlueDatabase {
|
||||
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
|
||||
let glue_client = aws_sdk_glue::Client::new(sdk_config);
|
||||
|
||||
let db = DatabaseInput::builder().name(&self.database_name).build()?;
|
||||
|
||||
glue_client
|
||||
.create_database()
|
||||
.database_input(db.clone())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let pod_info_columns = &[
|
||||
Column::builder()
|
||||
.name("namespace")
|
||||
.r#type("string")
|
||||
.build()?,
|
||||
Column::builder().name("name").r#type("string").build()?,
|
||||
Column::builder().name("ip").r#type("string").build()?,
|
||||
Column::builder()
|
||||
.name("creation_time")
|
||||
.r#type("timestamp")
|
||||
.build()?,
|
||||
Column::builder().name("node").r#type("string").build()?,
|
||||
Column::builder().name("az").r#type("string").build()?,
|
||||
];
|
||||
glue_client
|
||||
.create_table()
|
||||
.database_name(db.name())
|
||||
.table_input(
|
||||
TableInput::builder()
|
||||
.name(&self.pod_info_table_name)
|
||||
.storage_descriptor(
|
||||
StorageDescriptor::builder()
|
||||
.location(&self.pod_info_s3_bucket_uri)
|
||||
.compressed(false)
|
||||
.set_columns(Some(pod_info_columns.into_iter().cloned().collect()))
|
||||
.input_format("org.apache.hadoop.mapred.TextInputFormat")
|
||||
.output_format(
|
||||
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
|
||||
)
|
||||
.serde_info(
|
||||
SerDeInfo::builder()
|
||||
.serialization_library(
|
||||
"org.apache.hadoop.hive.serde2.OpenCSVSerde",
|
||||
)
|
||||
.parameters("separatorChar", ",")
|
||||
.parameters("quoteChar", "`")
|
||||
.parameters("escapeChar", r"\")
|
||||
.build(),
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
.table_type("EXTERNAL_TABLE")
|
||||
.parameters("classification", "csv")
|
||||
.parameters("skip.header.line.count", "1")
|
||||
.retention(0)
|
||||
.build()?,
|
||||
)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let vpc_flow_logs_columns = &[
|
||||
Column::builder().name("region").r#type("string").build()?,
|
||||
Column::builder().name("az_id").r#type("string").build()?,
|
||||
Column::builder().name("vpc_id").r#type("string").build()?,
|
||||
Column::builder()
|
||||
.name("flow_direction")
|
||||
.r#type("string")
|
||||
.build()?,
|
||||
Column::builder()
|
||||
.name("pkt_srcaddr")
|
||||
.r#type("string")
|
||||
.build()?,
|
||||
Column::builder()
|
||||
.name("pkt_dstaddr")
|
||||
.r#type("string")
|
||||
.build()?,
|
||||
Column::builder().name("srcport").r#type("int").build()?,
|
||||
Column::builder().name("dstport").r#type("int").build()?,
|
||||
Column::builder().name("start").r#type("bigint").build()?,
|
||||
Column::builder().name("bytes").r#type("bigint").build()?,
|
||||
];
|
||||
glue_client
|
||||
.create_table()
|
||||
.database_name(db.name())
|
||||
.table_input(
|
||||
TableInput::builder()
|
||||
.name(&self.vpc_flow_logs_table_name)
|
||||
.storage_descriptor(
|
||||
StorageDescriptor::builder()
|
||||
.location(&self.vpc_flow_logs_s3_bucket_uri)
|
||||
.compressed(false)
|
||||
.set_columns(Some(vpc_flow_logs_columns.into_iter().cloned().collect()))
|
||||
.input_format(
|
||||
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
|
||||
)
|
||||
.output_format(
|
||||
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
|
||||
)
|
||||
.serde_info(
|
||||
SerDeInfo::builder()
|
||||
.serialization_library(
|
||||
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
|
||||
)
|
||||
.parameters("serialization.format", "1")
|
||||
.build(),
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
.table_type("EXTERNAL_TABLE")
|
||||
.parameters("classification", "parquet")
|
||||
.retention(0)
|
||||
.build()?,
|
||||
)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let athena_results_columns = &[
|
||||
Column::builder().name("time").r#type("timestamp").build()?,
|
||||
Column::builder().name("traffic").r#type("string").build()?,
|
||||
Column::builder()
|
||||
.name("total_bytes")
|
||||
.r#type("bigint")
|
||||
.build()?,
|
||||
];
|
||||
glue_client
|
||||
.create_table()
|
||||
.database_name(db.name())
|
||||
.table_input(
|
||||
TableInput::builder()
|
||||
.name(&self.results_table_name)
|
||||
.storage_descriptor(
|
||||
StorageDescriptor::builder()
|
||||
.location(&self.results_s3_bucket_uri)
|
||||
.compressed(false)
|
||||
.set_columns(Some(athena_results_columns.into_iter().cloned().collect()))
|
||||
.input_format(
|
||||
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
|
||||
)
|
||||
.output_format(
|
||||
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
|
||||
)
|
||||
.serde_info(
|
||||
SerDeInfo::builder()
|
||||
.serialization_library(
|
||||
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
|
||||
)
|
||||
.parameters("serialization.format", "1")
|
||||
.build(),
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
.table_type("EXTERNAL_TABLE")
|
||||
.parameters("classification", "parquet")
|
||||
.retention(0)
|
||||
.build()?,
|
||||
)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct AthenaQuery {
|
||||
query_name: String,
|
||||
glue_database: String,
|
||||
invocation_frequency: usize,
|
||||
athena_results_table_name: String,
|
||||
vpc_flow_logs_table_name: String,
|
||||
pod_info_table_name: String,
|
||||
}
|
||||
|
||||
impl AthenaQuery {
|
||||
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<String> {
|
||||
let Self {
|
||||
athena_results_table_name,
|
||||
vpc_flow_logs_table_name,
|
||||
pod_info_table_name,
|
||||
invocation_frequency,
|
||||
..
|
||||
} = self;
|
||||
|
||||
let query_string = format!(
|
||||
r#"
|
||||
INSERT INTO "{athena_results_table_name}"
|
||||
WITH
|
||||
ip_addresses_and_az_mapping AS (
|
||||
SELECT
|
||||
DISTINCT pkt_srcaddr AS ipaddress,
|
||||
az_id
|
||||
FROM "{vpc_flow_logs_table_name}"
|
||||
WHERE flow_direction = 'egress'
|
||||
AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute))
|
||||
),
|
||||
egress_flows_of_pods_with_status AS (
|
||||
SELECT
|
||||
"{pod_info_table_name}".name AS srcpodname,
|
||||
pkt_srcaddr AS srcaddr,
|
||||
pkt_dstaddr AS dstaddr,
|
||||
"{vpc_flow_logs_table_name}".az_id AS srcazid,
|
||||
bytes,
|
||||
start
|
||||
FROM "{vpc_flow_logs_table_name}"
|
||||
INNER JOIN "{pod_info_table_name}" ON "{vpc_flow_logs_table_name}".pkt_srcaddr = "{pod_info_table_name}".ip
|
||||
WHERE flow_direction = 'egress'
|
||||
AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute))
|
||||
),
|
||||
cross_az_traffic_by_pod AS (
|
||||
SELECT
|
||||
srcaddr,
|
||||
srcpodname,
|
||||
dstaddr,
|
||||
"{pod_info_table_name}".name AS dstpodname,
|
||||
srcazid,
|
||||
ip_addresses_and_az_mapping.az_id AS dstazid,
|
||||
bytes,
|
||||
start
|
||||
FROM egress_flows_of_pods_with_status
|
||||
INNER JOIN "{pod_info_table_name}" ON dstaddr = "{pod_info_table_name}".ip
|
||||
LEFT JOIN ip_addresses_and_az_mapping ON dstaddr = ipaddress
|
||||
WHERE ip_addresses_and_az_mapping.az_id != srcazid
|
||||
)
|
||||
SELECT
|
||||
date_trunc('MINUTE', from_unixtime(start)) AS time,
|
||||
CONCAT(srcpodname, ' -> ', dstpodname) AS traffic,
|
||||
SUM(bytes) AS total_bytes
|
||||
FROM cross_az_traffic_by_pod
|
||||
GROUP BY date_trunc('MINUTE', from_unixtime(start)), CONCAT(srcpodname, ' -> ', dstpodname)
|
||||
ORDER BY time, total_bytes DESC
|
||||
"#
|
||||
);
|
||||
|
||||
let athena_client = aws_sdk_athena::Client::new(sdk_config);
|
||||
let res = athena_client
|
||||
.create_named_query()
|
||||
.name(&self.query_name)
|
||||
.database(&self.glue_database)
|
||||
.query_string(query_string)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(res.named_query_id.unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
struct StateMachine {
|
||||
name: String,
|
||||
role_arn: String,
|
||||
named_query_id: String,
|
||||
glue_database: String,
|
||||
lambda_function_name: String,
|
||||
athena_results_s3_bucket_uri: String,
|
||||
log_group_arn: String,
|
||||
}
|
||||
|
||||
impl StateMachine {
|
||||
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
|
||||
let sfn_client = aws_sdk_sfn::Client::new(sdk_config);
|
||||
sfn_client
|
||||
.create_state_machine()
|
||||
.name(&self.name)
|
||||
.role_arn(&self.role_arn)
|
||||
.logging_configuration(
|
||||
LoggingConfiguration::builder()
|
||||
.level(LogLevel::All)
|
||||
.destinations(
|
||||
LogDestination::builder()
|
||||
.cloud_watch_logs_log_group(
|
||||
CloudWatchLogsLogGroup::builder()
|
||||
.log_group_arn(&self.log_group_arn)
|
||||
.build(),
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
.definition(
|
||||
json!(
|
||||
{
|
||||
"StartAt": "Invoke",
|
||||
"States": {
|
||||
"Invoke": {
|
||||
"Type": "Task",
|
||||
"Resource": "arn:aws:states:::lambda:invoke",
|
||||
"Output": "{% $states.result.Payload %}",
|
||||
"Arguments": {
|
||||
"FunctionName": self.lambda_function_name,
|
||||
"Payload": json!({
|
||||
"detail-type": "Scheduled Event",
|
||||
"source": "aws.events",
|
||||
"detail": {}
|
||||
}).to_string()
|
||||
},
|
||||
"Retry": [
|
||||
{
|
||||
"ErrorEquals": [
|
||||
"Lambda.ServiceException",
|
||||
"Lambda.AWSLambdaException",
|
||||
"Lambda.SdkClientException",
|
||||
"Lambda.TooManyRequestsException"
|
||||
],
|
||||
"IntervalSeconds": 1,
|
||||
"MaxAttempts": 3,
|
||||
"BackoffRate": 2,
|
||||
"JitterStrategy": "FULL"
|
||||
}
|
||||
],
|
||||
"Next": "Check"
|
||||
},
|
||||
"Check": {
|
||||
"Type": "Choice",
|
||||
"Choices": [
|
||||
{
|
||||
"Next": "GetNamedQuery",
|
||||
"Condition": "{% $states.input.statusCode = 200 %}"
|
||||
}
|
||||
],
|
||||
"Default": "Fail"
|
||||
},
|
||||
"GetNamedQuery": {
|
||||
"Type": "Task",
|
||||
"Arguments": {
|
||||
"NamedQueryId": self.named_query_id
|
||||
},
|
||||
"Resource": "arn:aws:states:::aws-sdk:athena:getNamedQuery",
|
||||
"Output": {
|
||||
"QueryString": "{% $states.result.NamedQuery.QueryString %}"
|
||||
},
|
||||
"Next": "StartQueryExecution"
|
||||
},
|
||||
"StartQueryExecution": {
|
||||
"Type": "Task",
|
||||
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
|
||||
"Arguments": {
|
||||
"QueryString": "{% $states.input.QueryString %}",
|
||||
"QueryExecutionContext": {
|
||||
"Database": self.glue_database
|
||||
},
|
||||
"ResultConfiguration": {
|
||||
"OutputLocation": self.athena_results_s3_bucket_uri
|
||||
},
|
||||
"WorkGroup": "primary"
|
||||
},
|
||||
"End": true
|
||||
},
|
||||
"Fail": {
|
||||
"Type": "Fail"
|
||||
}
|
||||
},
|
||||
"QueryLanguage": "JSONata"
|
||||
}
|
||||
)
|
||||
.to_string(),
|
||||
)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct Schedule {
|
||||
name: String,
|
||||
interval_minutes: usize,
|
||||
target_state_machine_arn: String,
|
||||
target_role_arn: String,
|
||||
dead_letter_queue_arn: Option<String>,
|
||||
}
|
||||
|
||||
impl Schedule {
|
||||
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
|
||||
let sched_client = aws_sdk_scheduler::Client::new(sdk_config);
|
||||
|
||||
sched_client
|
||||
.create_schedule()
|
||||
.name(&self.name)
|
||||
.schedule_expression(format!("rate({} minute)", self.interval_minutes))
|
||||
.flexible_time_window(
|
||||
FlexibleTimeWindow::builder()
|
||||
.mode(FlexibleTimeWindowMode::Off)
|
||||
.build()?,
|
||||
)
|
||||
.target(
|
||||
Target::builder()
|
||||
.arn(&self.target_state_machine_arn)
|
||||
.role_arn(&self.target_role_arn)
|
||||
.input(
|
||||
json!({
|
||||
"detail-type": "Scheduled Event",
|
||||
"source": "aws.events",
|
||||
"detail": {}
|
||||
})
|
||||
.to_string(),
|
||||
)
|
||||
.retry_policy(
|
||||
RetryPolicy::builder()
|
||||
.maximum_retry_attempts(0)
|
||||
.maximum_event_age_in_seconds(60)
|
||||
.build(),
|
||||
)
|
||||
.set_dead_letter_config(
|
||||
self.dead_letter_queue_arn
|
||||
.as_ref()
|
||||
.map(|arn| DeadLetterConfig::builder().arn(arn).build()),
|
||||
)
|
||||
.build()?,
|
||||
)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct KubernetesRoles {
|
||||
region: String,
|
||||
cluster: String,
|
||||
k8s_role_prefix: String,
|
||||
lambda_role_arn: String,
|
||||
}
|
||||
|
||||
impl KubernetesRoles {
|
||||
fn print(&self) -> anyhow::Result<()> {
|
||||
let Self {
|
||||
region,
|
||||
cluster,
|
||||
k8s_role_prefix,
|
||||
lambda_role_arn,
|
||||
} = self;
|
||||
|
||||
let yaml = format!(
|
||||
r#"
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRole
|
||||
metadata:
|
||||
name: {k8s_role_prefix}-clusterrole
|
||||
rules:
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources: ["nodes", "namespaces", "pods"]
|
||||
verbs: ["get", "list"]
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRoleBinding
|
||||
metadata:
|
||||
name: {k8s_role_prefix}-binding
|
||||
subjects:
|
||||
- kind: Group
|
||||
name: {k8s_role_prefix}-group
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
roleRef:
|
||||
kind: ClusterRole
|
||||
name: {k8s_role_prefix}-clusterrole
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
"#
|
||||
);
|
||||
|
||||
let eksctl = format!(
|
||||
r#"eksctl create iamidentitymapping \
|
||||
--region "{region}"
|
||||
--cluster "{cluster}" \
|
||||
--arn "{lambda_role_arn}" \
|
||||
--username "{k8s_role_prefix}-binding" \
|
||||
--group "{k8s_role_prefix}-group"
|
||||
"#
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
27
lambda/pod_info_dumper/Cargo.toml
Normal file
27
lambda/pod_info_dumper/Cargo.toml
Normal file
@@ -0,0 +1,27 @@
|
||||
[package]
|
||||
name = "pod_info_dumper"
|
||||
version = "0.0.0"
|
||||
edition = "2024"
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
aws_lambda_events = { version = "0.16.0", default-features = false, features = ["eventbridge"] }
|
||||
aws-config = { workspace = true }
|
||||
aws-sdk-eks = "1.75.0"
|
||||
aws-sdk-s3 = { workspace = true }
|
||||
aws-sdk-sts = "1.65.0"
|
||||
aws-sigv4 = "1.3.0"
|
||||
base64 = { version = "0.22.1" }
|
||||
csv = { version = "1.3.1", default-features = false }
|
||||
http = { workspace = true }
|
||||
k8s-openapi = { version = "0.24.0", default-features = false, features = ["v1_31"] }
|
||||
kube = { version = "0.99.0", default-features = false, features = ["client", "rustls-tls"] }
|
||||
lambda_runtime = { version = "0.13.0", default-features = false, features = ["tracing"] }
|
||||
rustls = { version = "0.23.25" }
|
||||
rustls-pemfile = { workspace = true }
|
||||
secrecy = "0.10.3"
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
sha2 = { workspace = true, features = ["asm"] }
|
||||
tokio = { workspace = true, features = ["macros"] }
|
||||
tracing = { workspace = true, features = ["max_level_debug", "release_max_level_info"] }
|
||||
8
lambda/pod_info_dumper/README.md
Normal file
8
lambda/pod_info_dumper/README.md
Normal file
@@ -0,0 +1,8 @@
|
||||
# pod_info_dumper
|
||||
|
||||
An event-triggered AWS lambda function that writes the list of all pods with
|
||||
node information to a CSV file in S3.
|
||||
|
||||
```shell
|
||||
cargo lambda build -p pod_info_dumper --output-format Zip --x86-64 --profile release-lambda-function
|
||||
```
|
||||
420
lambda/pod_info_dumper/src/lib.rs
Normal file
420
lambda/pod_info_dumper/src/lib.rs
Normal file
@@ -0,0 +1,420 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::{env, io};
|
||||
|
||||
use aws_config::default_provider::credentials::DefaultCredentialsChain;
|
||||
use aws_config::retry::RetryConfig;
|
||||
use aws_lambda_events::event::eventbridge::EventBridgeEvent;
|
||||
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
|
||||
use aws_sdk_s3::types::ChecksumAlgorithm;
|
||||
use aws_sdk_sts::config::ProvideCredentials;
|
||||
use aws_sigv4::http_request::{
|
||||
SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
|
||||
};
|
||||
use aws_sigv4::sign::v4;
|
||||
use base64::Engine as _;
|
||||
use base64::engine::general_purpose::STANDARD;
|
||||
use base64::prelude::*;
|
||||
use k8s_openapi::api::core::v1::{Node, Pod};
|
||||
use k8s_openapi::chrono::SecondsFormat;
|
||||
use kube::api::{Api, ListParams, ResourceExt};
|
||||
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
|
||||
use secrecy::SecretString;
|
||||
use serde::ser::SerializeMap;
|
||||
use sha2::{Digest as _, Sha256};
|
||||
|
||||
const AZ_LABEL: &str = "topology.kubernetes.io/zone";
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Config {
|
||||
aws_account_id: String,
|
||||
s3_bucket: S3BucketConfig,
|
||||
eks_cluster: EksClusterConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct S3BucketConfig {
|
||||
region: String,
|
||||
name: String,
|
||||
key: String,
|
||||
}
|
||||
|
||||
impl S3BucketConfig {
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
async fn create_sdk_config(&self) -> Result<aws_config::SdkConfig, Error> {
|
||||
let region = aws_config::Region::new(self.region.clone());
|
||||
|
||||
let credentials_provider = DefaultCredentialsChain::builder()
|
||||
.region(region.clone())
|
||||
.build()
|
||||
.await;
|
||||
|
||||
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
|
||||
.region(region)
|
||||
.credentials_provider(credentials_provider)
|
||||
.load()
|
||||
.await)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct EksClusterConfig {
|
||||
region: String,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl EksClusterConfig {
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
async fn create_sdk_config(&self) -> Result<aws_config::SdkConfig, Error> {
|
||||
let region = aws_config::Region::new(self.region.clone());
|
||||
|
||||
let credentials_provider = DefaultCredentialsChain::builder()
|
||||
.region(region.clone())
|
||||
.build()
|
||||
.await;
|
||||
|
||||
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
|
||||
.region(region)
|
||||
.credentials_provider(credentials_provider)
|
||||
.load()
|
||||
.await)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn start() -> Result<(), Error> {
|
||||
tracing::init_default_subscriber();
|
||||
rustls::crypto::aws_lc_rs::default_provider()
|
||||
.install_default()
|
||||
.unwrap();
|
||||
|
||||
tracing::info!("function handler started");
|
||||
|
||||
let config = Config {
|
||||
aws_account_id: env::var("NEON_ACCOUNT_ID")?,
|
||||
s3_bucket: S3BucketConfig {
|
||||
region: env::var("NEON_REGION")?,
|
||||
name: env::var("NEON_S3_BUCKET_NAME")?,
|
||||
key: env::var("NEON_S3_BUCKET_KEY")?,
|
||||
},
|
||||
eks_cluster: EksClusterConfig {
|
||||
region: env::var("NEON_REGION")?,
|
||||
name: env::var("NEON_CLUSTER")?,
|
||||
},
|
||||
};
|
||||
|
||||
run(service_fn(async |event: LambdaEvent<EventBridgeEvent<serde_json::Value>>| -> Result<StatusResponse, Error> {
|
||||
function_handler(event, &config).await
|
||||
}))
|
||||
.await
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct StatusResponse {
|
||||
status_code: http::StatusCode,
|
||||
body: Cow<'static, str>,
|
||||
}
|
||||
|
||||
impl StatusResponse {
|
||||
fn ok() -> Self {
|
||||
StatusResponse {
|
||||
status_code: http::StatusCode::OK,
|
||||
body: "OK".into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for StatusResponse {
|
||||
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
serializer.serialize_entry("statusCode", &self.status_code.as_u16())?;
|
||||
serializer.serialize_entry("body", &self.body)?;
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(?event), err)]
|
||||
async fn function_handler(
|
||||
event: LambdaEvent<EventBridgeEvent<serde_json::Value>>,
|
||||
config: &Config,
|
||||
) -> Result<StatusResponse, Error> {
|
||||
tracing::info!("function handler called");
|
||||
|
||||
let kube_client = connect_to_cluster(config).await?;
|
||||
let s3_client = connect_to_s3(config).await?;
|
||||
|
||||
let nodes_azs = get_nodes_azs(kube_client.clone()).await?;
|
||||
|
||||
let mut pods_info = get_current_pods(kube_client.clone(), &nodes_azs).await?;
|
||||
pods_info.sort_unstable();
|
||||
|
||||
let mut csv = Vec::with_capacity(64 * 1024);
|
||||
write_csv(&pods_info, &mut csv)?;
|
||||
|
||||
tracing::info!(
|
||||
"csv is {} bytes, containing {} pods",
|
||||
csv.len(),
|
||||
pods_info.len()
|
||||
);
|
||||
|
||||
upload_csv(config, &s3_client, &csv).await?;
|
||||
|
||||
tracing::info!("pod info successfully stored");
|
||||
Ok(StatusResponse::ok())
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
struct PodInfo<'a> {
|
||||
namespace: String,
|
||||
name: String,
|
||||
ip: String,
|
||||
creation_time: String,
|
||||
node: String,
|
||||
az: Option<&'a str>,
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
async fn connect_to_cluster(config: &Config) -> Result<kube::Client, Error> {
|
||||
let sdk_config = config.eks_cluster.create_sdk_config().await?;
|
||||
let eks_client = aws_sdk_eks::Client::new(&sdk_config);
|
||||
|
||||
let resp = eks_client
|
||||
.describe_cluster()
|
||||
.name(&config.eks_cluster.name)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let cluster = resp
|
||||
.cluster()
|
||||
.ok_or_else(|| format!("cluster not found: {}", config.eks_cluster.name))?;
|
||||
let endpoint = cluster.endpoint().ok_or("cluster endpoint not found")?;
|
||||
let ca_data = cluster
|
||||
.certificate_authority()
|
||||
.and_then(|ca| ca.data())
|
||||
.ok_or("cluster certificate data not found")?;
|
||||
|
||||
let mut k8s_config = kube::Config::new(endpoint.parse()?);
|
||||
let cert_bytes = STANDARD.decode(ca_data)?;
|
||||
let certs = rustls_pemfile::certs(&mut cert_bytes.as_slice())
|
||||
.map(|c| c.map(|c| c.to_vec()))
|
||||
.collect::<Result<_, _>>()?;
|
||||
k8s_config.root_cert = Some(certs);
|
||||
k8s_config.auth_info.token = Some(
|
||||
create_kube_auth_token(
|
||||
&sdk_config,
|
||||
&config.eks_cluster.name,
|
||||
Duration::from_secs(10 * 60),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
tracing::info!("cluster description completed");
|
||||
|
||||
Ok(kube::Client::try_from(k8s_config)?)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
async fn create_kube_auth_token(
|
||||
sdk_config: &aws_config::SdkConfig,
|
||||
cluster_name: &str,
|
||||
expires_in: Duration,
|
||||
) -> Result<SecretString, Error> {
|
||||
let identity = sdk_config
|
||||
.credentials_provider()
|
||||
.unwrap()
|
||||
.provide_credentials()
|
||||
.await?
|
||||
.into();
|
||||
|
||||
let region = sdk_config.region().expect("region").as_ref();
|
||||
let host = format!("sts.{region}.amazonaws.com");
|
||||
let get_caller_id_url = format!("https://{host}/?Action=GetCallerIdentity&Version=2011-06-15");
|
||||
|
||||
let mut signing_settings = SigningSettings::default();
|
||||
signing_settings.signature_location = SignatureLocation::QueryParams;
|
||||
signing_settings.expires_in = Some(expires_in);
|
||||
let signing_params = v4::SigningParams::builder()
|
||||
.identity(&identity)
|
||||
.region(region)
|
||||
.name("sts")
|
||||
.time(SystemTime::now())
|
||||
.settings(signing_settings)
|
||||
.build()?
|
||||
.into();
|
||||
let signable_request = SignableRequest::new(
|
||||
"GET",
|
||||
&get_caller_id_url,
|
||||
[("host", host.as_str()), ("x-k8s-aws-id", cluster_name)].into_iter(),
|
||||
SignableBody::Bytes(&[]),
|
||||
)?;
|
||||
let (signing_instructions, _signature) = sign(signable_request, &signing_params)?.into_parts();
|
||||
|
||||
let mut token_request = http::Request::get(get_caller_id_url).body(()).unwrap();
|
||||
signing_instructions.apply_to_request_http1x(&mut token_request);
|
||||
|
||||
let token = format!(
|
||||
"k8s-aws-v1.{}",
|
||||
BASE64_STANDARD_NO_PAD.encode(token_request.uri().to_string())
|
||||
)
|
||||
.into();
|
||||
|
||||
Ok(token)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
async fn connect_to_s3(config: &Config) -> Result<aws_sdk_s3::Client, Error> {
|
||||
let sdk_config = config.s3_bucket.create_sdk_config().await?;
|
||||
|
||||
let s3_client = aws_sdk_s3::Client::from_conf(
|
||||
aws_sdk_s3::config::Builder::from(&sdk_config)
|
||||
.retry_config(RetryConfig::standard())
|
||||
.build(),
|
||||
);
|
||||
|
||||
Ok(s3_client)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
async fn get_nodes_azs(client: kube::Client) -> Result<HashMap<String, String>, Error> {
|
||||
let nodes = Api::<Node>::all(client);
|
||||
|
||||
let list_params = ListParams::default().timeout(10);
|
||||
|
||||
let mut nodes_azs = HashMap::default();
|
||||
for node in nodes.list(&list_params).await? {
|
||||
let Some(name) = node.metadata.name else {
|
||||
tracing::warn!("pod without name");
|
||||
continue;
|
||||
};
|
||||
let Some(mut labels) = node.metadata.labels else {
|
||||
tracing::warn!(name, "pod without labels");
|
||||
continue;
|
||||
};
|
||||
let Some(az) = labels.remove(AZ_LABEL) else {
|
||||
tracing::warn!(name, "pod without AZ label");
|
||||
continue;
|
||||
};
|
||||
|
||||
tracing::debug!(name, az, "adding node");
|
||||
nodes_azs.insert(name, az);
|
||||
}
|
||||
|
||||
Ok(nodes_azs)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
async fn get_current_pods(
|
||||
client: kube::Client,
|
||||
node_az: &HashMap<String, String>,
|
||||
) -> Result<Vec<PodInfo<'_>>, Error> {
|
||||
let pods = Api::<Pod>::all(client);
|
||||
|
||||
let mut pods_info = vec![];
|
||||
let mut continuation_token = Some(String::new());
|
||||
|
||||
while let Some(token) = continuation_token {
|
||||
let list_params = ListParams::default()
|
||||
.timeout(10)
|
||||
.limit(500)
|
||||
.continue_token(&token);
|
||||
|
||||
let list = pods.list(&list_params).await?;
|
||||
continuation_token = list.metadata.continue_;
|
||||
|
||||
tracing::info!("received list of {} pods", list.items.len());
|
||||
|
||||
for pod in list.items {
|
||||
let name = pod.name_any();
|
||||
let Some(namespace) = pod.namespace() else {
|
||||
tracing::warn!(name, "pod without namespace");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(status) = pod.status else {
|
||||
tracing::warn!(namespace, name, "pod without status");
|
||||
continue;
|
||||
};
|
||||
let Some(conditions) = status.conditions else {
|
||||
tracing::warn!(namespace, name, "pod without conditions");
|
||||
continue;
|
||||
};
|
||||
let Some(ready_condition) = conditions.iter().find(|cond| cond.type_ == "Ready") else {
|
||||
tracing::debug!(namespace, name, "pod not ready");
|
||||
continue;
|
||||
};
|
||||
let Some(ref ready_time) = ready_condition.last_transition_time else {
|
||||
tracing::warn!(
|
||||
namespace,
|
||||
name,
|
||||
"pod ready condition without transition time"
|
||||
);
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(spec) = pod.spec else {
|
||||
tracing::warn!(namespace, name, "pod without spec");
|
||||
continue;
|
||||
};
|
||||
let Some(node) = spec.node_name else {
|
||||
tracing::warn!(namespace, name, "pod without node");
|
||||
continue;
|
||||
};
|
||||
let Some(ip) = status.pod_ip else {
|
||||
tracing::warn!(namespace, name, "pod without IP");
|
||||
continue;
|
||||
};
|
||||
let az = node_az.get(&node).map(String::as_str);
|
||||
let creation_time = ready_time.0.to_rfc3339_opts(SecondsFormat::Secs, true);
|
||||
|
||||
let pod_info = PodInfo {
|
||||
namespace,
|
||||
name,
|
||||
ip,
|
||||
creation_time,
|
||||
node,
|
||||
az,
|
||||
};
|
||||
tracing::debug!(?pod_info, "adding pod");
|
||||
|
||||
pods_info.push(pod_info);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(pods_info)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
fn write_csv<W: io::Write>(pods_info: &Vec<PodInfo>, writer: W) -> Result<(), Error> {
|
||||
let mut w = csv::Writer::from_writer(writer);
|
||||
for pod in pods_info {
|
||||
w.serialize(pod)?;
|
||||
}
|
||||
w.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, err)]
|
||||
async fn upload_csv(
|
||||
config: &Config,
|
||||
s3_client: &aws_sdk_s3::Client,
|
||||
csv: &[u8],
|
||||
) -> Result<aws_sdk_s3::operation::put_object::PutObjectOutput, Error> {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(csv);
|
||||
let csum = hasher.finalize();
|
||||
|
||||
let resp = s3_client
|
||||
.put_object()
|
||||
.bucket(&config.s3_bucket.name)
|
||||
.key(&config.s3_bucket.key)
|
||||
.content_type("text/csv")
|
||||
.checksum_algorithm(ChecksumAlgorithm::Sha256)
|
||||
.checksum_sha256(STANDARD.encode(csum))
|
||||
.body(ByteStream::from(SdkBody::from(csv)))
|
||||
.expected_bucket_owner(&config.aws_account_id)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
3
lambda/pod_info_dumper/src/main.rs
Normal file
3
lambda/pod_info_dumper/src/main.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
fn main() -> Result<(), lambda_runtime::Error> {
|
||||
pod_info_dumper::start()
|
||||
}
|
||||
@@ -91,14 +91,14 @@ impl Server {
|
||||
Ok(tls_stream) => tls_stream,
|
||||
Err(err) => {
|
||||
if !suppress_io_error(&err) {
|
||||
info!("Failed to accept TLS connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!("Failed to serve HTTPS connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,7 +106,7 @@ impl Server {
|
||||
// Handle HTTP connection.
|
||||
if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!("Failed to serve HTTP connection: {err:#}");
|
||||
info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ postgres_backend.workspace = true
|
||||
nix = {workspace = true, optional = true}
|
||||
reqwest.workspace = true
|
||||
rand.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
bincode.workspace = true
|
||||
|
||||
@@ -134,6 +134,7 @@ pub struct ConfigToml {
|
||||
pub load_previous_heatmap: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generate_unarchival_heatmap: Option<bool>,
|
||||
pub tracing: Option<Tracing>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -191,6 +192,54 @@ pub enum GetVectoredConcurrentIo {
|
||||
SidecarTask,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Ratio {
|
||||
pub numerator: usize,
|
||||
pub denominator: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct OtelExporterConfig {
|
||||
pub endpoint: String,
|
||||
pub protocol: OtelExporterProtocol,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum OtelExporterProtocol {
|
||||
Grpc,
|
||||
HttpBinary,
|
||||
HttpJson,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Tracing {
|
||||
pub sampling_ratio: Ratio,
|
||||
pub export_config: OtelExporterConfig,
|
||||
}
|
||||
|
||||
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
|
||||
fn from(val: &OtelExporterConfig) -> Self {
|
||||
tracing_utils::ExportConfig {
|
||||
endpoint: Some(val.endpoint.clone()),
|
||||
protocol: val.protocol.into(),
|
||||
timeout: val.timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
|
||||
fn from(val: OtelExporterProtocol) -> Self {
|
||||
match val {
|
||||
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
|
||||
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
|
||||
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -367,6 +416,9 @@ pub struct TenantConfigToml {
|
||||
/// The ratio that triggers the auto gc-compaction. If (the total size of layers between L2 LSN and gc-horizon) / (size below the L2 LSN)
|
||||
/// is above this ratio, gc-compaction will be triggered.
|
||||
pub gc_compaction_ratio_percent: u64,
|
||||
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
|
||||
/// that will get perf sampling for the tenant.
|
||||
pub sampling_ratio: Option<Ratio>,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -537,6 +589,7 @@ impl Default for ConfigToml {
|
||||
validate_wal_contiguity: None,
|
||||
load_previous_heatmap: None,
|
||||
generate_unarchival_heatmap: None,
|
||||
tracing: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -652,6 +705,7 @@ impl Default for TenantConfigToml {
|
||||
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
|
||||
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
|
||||
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
|
||||
sampling_ratio: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use utils::lsn::Lsn;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::{completion, serde_system_time};
|
||||
|
||||
use crate::config::Ratio;
|
||||
use crate::key::{CompactKey, Key};
|
||||
use crate::reltag::RelTag;
|
||||
use crate::shard::{ShardCount, ShardStripeSize, TenantShardId};
|
||||
@@ -568,6 +569,8 @@ pub struct TenantConfigPatch {
|
||||
pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_compaction_ratio_percent: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub sampling_ratio: FieldPatch<Option<Ratio>>,
|
||||
}
|
||||
|
||||
/// Like [`crate::config::TenantConfigToml`], but preserves the information
|
||||
@@ -688,6 +691,9 @@ pub struct TenantConfig {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_compaction_ratio_percent: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub sampling_ratio: Option<Option<Ratio>>,
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
@@ -730,6 +736,7 @@ impl TenantConfig {
|
||||
mut gc_compaction_enabled,
|
||||
mut gc_compaction_initial_threshold_kb,
|
||||
mut gc_compaction_ratio_percent,
|
||||
mut sampling_ratio,
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
@@ -824,6 +831,7 @@ impl TenantConfig {
|
||||
patch
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
patch.sampling_ratio.apply(&mut sampling_ratio);
|
||||
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
@@ -860,6 +868,7 @@ impl TenantConfig {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
sampling_ratio,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -961,6 +970,7 @@ impl TenantConfig {
|
||||
gc_compaction_ratio_percent: self
|
||||
.gc_compaction_ratio_percent
|
||||
.unwrap_or(global_conf.gc_compaction_ratio_percent),
|
||||
sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1418,11 +1428,6 @@ pub struct TimelineInfo {
|
||||
pub last_record_lsn: Lsn,
|
||||
pub prev_record_lsn: Option<Lsn>,
|
||||
|
||||
/// Legacy field, retained for one version to enable old storage controller to
|
||||
/// decode (it was a mandatory field).
|
||||
#[serde(default, rename = "latest_gc_cutoff_lsn")]
|
||||
pub _unused: Lsn,
|
||||
|
||||
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
|
||||
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
|
||||
/// as it is easier to reason about.
|
||||
|
||||
@@ -558,7 +558,7 @@ async fn upload_large_enough_file(
|
||||
) -> usize {
|
||||
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
let body = bytes::Bytes::from(vec![0u8; 1024]);
|
||||
let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
|
||||
let contents = std::iter::once(header).chain(std::iter::repeat_n(body, 128));
|
||||
|
||||
let len = contents.clone().fold(0, |acc, next| acc + next.len());
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ pub struct PeerInfo {
|
||||
pub ts: Instant,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
pub https_connstr: Option<String>,
|
||||
}
|
||||
|
||||
pub type FullTransactionId = u64;
|
||||
@@ -227,6 +228,8 @@ pub struct TimelineDeleteResult {
|
||||
pub dir_existed: bool,
|
||||
}
|
||||
|
||||
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
|
||||
|
||||
fn lsn_invalid() -> Lsn {
|
||||
Lsn::INVALID
|
||||
}
|
||||
@@ -259,6 +262,8 @@ pub struct SkTimelineInfo {
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub http_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub https_connstr: Option<String>,
|
||||
// Minimum of all active RO replicas flush LSN
|
||||
#[serde(default = "lsn_invalid")]
|
||||
pub standby_horizon: Lsn,
|
||||
|
||||
@@ -14,6 +14,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber.workspace = true # For examples in docs
|
||||
|
||||
@@ -31,10 +31,10 @@
|
||||
//! .init();
|
||||
//! }
|
||||
//! ```
|
||||
#![deny(unsafe_code)]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
pub mod http;
|
||||
pub mod perf_span;
|
||||
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
|
||||
144
libs/tracing-utils/src/perf_span.rs
Normal file
144
libs/tracing-utils/src/perf_span.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
//! Crutch module to work around tracing infrastructure deficiencies
|
||||
//!
|
||||
//! We wish to collect granular request spans without impacting performance
|
||||
//! by much. Ideally, we should have zero overhead for a sampling rate of 0.
|
||||
//!
|
||||
//! The approach taken by the pageserver crate is to use a completely different
|
||||
//! span hierarchy for the performance spans. Spans are explicitly stored in
|
||||
//! the request context and use a different [`tracing::Subscriber`] in order
|
||||
//! to avoid expensive filtering.
|
||||
//!
|
||||
//! [`tracing::Span`] instances record their [`tracing::Dispatch`] and, implcitly,
|
||||
//! their [`tracing::Subscriber`] at creation time. However, upon exiting the span,
|
||||
//! the global default [`tracing::Dispatch`] is used. This is problematic if one
|
||||
//! wishes to juggle different subscribers.
|
||||
//!
|
||||
//! In order to work around this, this module provides a [`PerfSpan`] type which
|
||||
//! wraps a [`Span`] and sets the default subscriber when exiting the span. This
|
||||
//! achieves the correct routing.
|
||||
//!
|
||||
//! There's also a modified version of [`tracing::Instrument`] which works with
|
||||
//! [`PerfSpan`].
|
||||
|
||||
use core::{
|
||||
future::Future,
|
||||
marker::Sized,
|
||||
mem::ManuallyDrop,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{Dispatch, span::Span};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PerfSpan {
|
||||
inner: ManuallyDrop<Span>,
|
||||
dispatch: Dispatch,
|
||||
}
|
||||
|
||||
#[must_use = "once a span has been entered, it should be exited"]
|
||||
pub struct PerfSpanEntered<'a> {
|
||||
span: &'a PerfSpan,
|
||||
}
|
||||
|
||||
impl PerfSpan {
|
||||
pub fn new(span: Span, dispatch: Dispatch) -> Self {
|
||||
Self {
|
||||
inner: ManuallyDrop::new(span),
|
||||
dispatch,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enter(&self) -> PerfSpanEntered {
|
||||
if let Some(ref id) = self.inner.id() {
|
||||
self.dispatch.enter(id);
|
||||
}
|
||||
|
||||
PerfSpanEntered { span: self }
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &Span {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpan {
|
||||
fn drop(&mut self) {
|
||||
// Bring the desired dispatch into scope before explicitly calling
|
||||
// the span destructor. This routes the span exit to the correct
|
||||
// [`tracing::Subscriber`].
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.dispatch);
|
||||
// SAFETY: ManuallyDrop in Drop implementation
|
||||
unsafe { ManuallyDrop::drop(&mut self.inner) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerfSpanEntered<'_> {
|
||||
fn drop(&mut self) {
|
||||
assert!(self.span.inner.id().is_some());
|
||||
|
||||
let _dispatch_guard = tracing::dispatcher::set_default(&self.span.dispatch);
|
||||
self.span.dispatch.exit(&self.span.inner.id().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PerfInstrument: Sized {
|
||||
fn instrument(self, span: PerfSpan) -> PerfInstrumented<Self> {
|
||||
PerfInstrumented {
|
||||
inner: ManuallyDrop::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
#[project = PerfInstrumentedProj]
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct PerfInstrumented<T> {
|
||||
// `ManuallyDrop` is used here to to enter instrument `Drop` by entering
|
||||
// `Span` and executing `ManuallyDrop::drop`.
|
||||
#[pin]
|
||||
inner: ManuallyDrop<T>,
|
||||
span: PerfSpan,
|
||||
}
|
||||
|
||||
impl<T> PinnedDrop for PerfInstrumented<T> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let this = this.project();
|
||||
let _enter = this.span.enter();
|
||||
// SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
|
||||
// different from wrapping `T` in `Option` and calling
|
||||
// `Pin::set(&mut this.inner, None)`, except avoiding
|
||||
// additional memory overhead.
|
||||
// 2. `ManuallyDrop::drop()` is safe, because
|
||||
// `PinnedDrop::drop()` is guaranteed to be called only
|
||||
// once.
|
||||
unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> PerfInstrumentedProj<'a, T> {
|
||||
/// Get a mutable reference to the [`Span`] a pinned mutable reference to
|
||||
/// the wrapped type.
|
||||
fn span_and_inner_pin_mut(self) -> (&'a mut PerfSpan, Pin<&'a mut T>) {
|
||||
// SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
|
||||
// and `inner` is valid, because `ManuallyDrop::drop` is called
|
||||
// only inside `Drop` of the `Instrumented`.
|
||||
let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
|
||||
(self.span, inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> Future for PerfInstrumented<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let (span, inner) = self.project().span_and_inner_pin_mut();
|
||||
let _enter = span.enter();
|
||||
inner.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized> PerfInstrument for T {}
|
||||
@@ -86,17 +86,17 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
/// Get an arbitrary path and returning a streaming Response. This function is suitable
|
||||
/// for pass-through/proxy use cases where we don't care what the response content looks
|
||||
/// like.
|
||||
/// Send an HTTP request to an arbitrary path with a desired HTTP method and returning a streaming
|
||||
/// Response. This function is suitable for pass-through/proxy use cases where we don't care
|
||||
/// what the response content looks like.
|
||||
///
|
||||
/// Use/add one of the properly typed methods below if you know aren't proxying, and
|
||||
/// know what kind of response you expect.
|
||||
pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
|
||||
pub async fn op_raw(&self, method: Method, path: String) -> Result<reqwest::Response> {
|
||||
debug_assert!(path.starts_with('/'));
|
||||
let uri = format!("{}{}", self.mgmt_api_endpoint, path);
|
||||
|
||||
let mut req = self.client.request(Method::GET, uri);
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value);
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ use tokio::signal::unix::SignalKind;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use tracing_utils::OtelGuard;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::crashsafe::syncfs;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
@@ -118,6 +119,21 @@ fn main() -> anyhow::Result<()> {
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
let otel_enablement = match &conf.tracing {
|
||||
Some(cfg) => tracing_utils::OtelEnablement::Enabled {
|
||||
service_name: "pageserver".to_string(),
|
||||
export_config: (&cfg.export_config).into(),
|
||||
runtime: *COMPUTE_REQUEST_RUNTIME,
|
||||
},
|
||||
None => tracing_utils::OtelEnablement::Disabled,
|
||||
};
|
||||
|
||||
let otel_guard = tracing_utils::init_performance_tracing(otel_enablement);
|
||||
|
||||
if otel_guard.is_some() {
|
||||
info!(?conf.tracing, "starting with OTEL tracing enabled");
|
||||
}
|
||||
|
||||
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
|
||||
// disarming this hook on pageserver, because we never tear down tracing.
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
@@ -191,7 +207,7 @@ fn main() -> anyhow::Result<()> {
|
||||
tracing::info!("Initializing page_cache...");
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
@@ -290,6 +306,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
|
||||
fn start_pageserver(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
conf: &'static PageServerConf,
|
||||
otel_guard: Option<OtelGuard>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Monotonic time for later calculating startup duration
|
||||
let started_startup_at = Instant::now();
|
||||
@@ -675,13 +692,21 @@ fn start_pageserver(
|
||||
|
||||
// Spawn a task to listen for libpq connections. It will spawn further tasks
|
||||
// for each connection. We created the listener earlier already.
|
||||
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
|
||||
});
|
||||
let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone());
|
||||
let page_service = page_service::spawn(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
{
|
||||
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
|
||||
pageserver_listener
|
||||
.set_nonblocking(true)
|
||||
.context("set listener to nonblocking")?;
|
||||
tokio::net::TcpListener::from_std(pageserver_listener)
|
||||
.context("create tokio listener")?
|
||||
},
|
||||
);
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
|
||||
@@ -215,6 +215,8 @@ pub struct PageServerConf {
|
||||
|
||||
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
|
||||
pub generate_unarchival_heatmap: bool,
|
||||
|
||||
pub tracing: Option<pageserver_api::config::Tracing>,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -386,6 +388,7 @@ impl PageServerConf {
|
||||
validate_wal_contiguity,
|
||||
load_previous_heatmap,
|
||||
generate_unarchival_heatmap,
|
||||
tracing,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -435,6 +438,7 @@ impl PageServerConf {
|
||||
wal_receiver_protocol,
|
||||
page_service_pipelining,
|
||||
get_vectored_concurrent_io,
|
||||
tracing,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
@@ -506,6 +510,17 @@ impl PageServerConf {
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(tracing_config) = conf.tracing.as_ref() {
|
||||
let ratio = &tracing_config.sampling_ratio;
|
||||
ensure!(
|
||||
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
|
||||
format!(
|
||||
"Invalid sampling ratio: {}/{}",
|
||||
ratio.numerator, ratio.denominator
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
|
||||
.map_err(anyhow::Error::msg)
|
||||
.with_context(|| {
|
||||
|
||||
@@ -100,6 +100,12 @@ use crate::{
|
||||
task_mgr::TaskKind,
|
||||
tenant::Timeline,
|
||||
};
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use std::future::Future;
|
||||
use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
|
||||
|
||||
use tracing::{Dispatch, Span};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
pub struct RequestContext {
|
||||
@@ -109,6 +115,8 @@ pub struct RequestContext {
|
||||
page_content_kind: PageContentKind,
|
||||
read_path_debug: bool,
|
||||
scope: Scope,
|
||||
perf_span: Option<PerfSpan>,
|
||||
perf_span_dispatch: Option<Dispatch>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -263,22 +271,15 @@ impl RequestContextBuilder {
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
scope: Scope::new_global(),
|
||||
perf_span: None,
|
||||
perf_span_dispatch: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extend(original: &RequestContext) -> Self {
|
||||
pub fn from(original: &RequestContext) -> Self {
|
||||
Self {
|
||||
// This is like a Copy, but avoid implementing Copy because ordinary users of
|
||||
// RequestContext should always move or ref it.
|
||||
inner: RequestContext {
|
||||
task_kind: original.task_kind,
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
read_path_debug: original.read_path_debug,
|
||||
scope: original.scope.clone(),
|
||||
},
|
||||
inner: original.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -316,12 +317,74 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
|
||||
self.inner.perf_span_dispatch = dispatch;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce() -> Span,
|
||||
{
|
||||
assert!(self.inner.perf_span.is_none());
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
|
||||
where
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
if let Some(ref perf_span) = self.inner.perf_span {
|
||||
assert!(self.inner.perf_span_dispatch.is_some());
|
||||
let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn root(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn attached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn detached_child(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestContext {
|
||||
/// Private clone implementation
|
||||
///
|
||||
/// Callers should use the [`RequestContextBuilder`] or child spaning APIs of
|
||||
/// [`RequestContext`].
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
task_kind: self.task_kind,
|
||||
download_behavior: self.download_behavior,
|
||||
access_stats_behavior: self.access_stats_behavior,
|
||||
page_content_kind: self.page_content_kind,
|
||||
read_path_debug: self.read_path_debug,
|
||||
scope: self.scope.clone(),
|
||||
perf_span: self.perf_span.clone(),
|
||||
perf_span_dispatch: self.perf_span_dispatch.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new RequestContext that has no parent.
|
||||
///
|
||||
/// The function is called `new` because, once we add children
|
||||
@@ -337,7 +400,7 @@ impl RequestContext {
|
||||
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::new(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
.root()
|
||||
}
|
||||
|
||||
/// Create a detached child context for a task that may outlive `self`.
|
||||
@@ -358,7 +421,10 @@ impl RequestContext {
|
||||
///
|
||||
/// We could make new calls to this function fail if `self` is already canceled.
|
||||
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
self.child_impl(task_kind, download_behavior)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.detached_child()
|
||||
}
|
||||
|
||||
/// Create a child of context `self` for a task that shall not outlive `self`.
|
||||
@@ -382,7 +448,7 @@ impl RequestContext {
|
||||
/// The method to wait for child tasks would return an error, indicating
|
||||
/// that the child task was not started because the context was canceled.
|
||||
pub fn attached_child(&self) -> Self {
|
||||
self.child_impl(self.task_kind(), self.download_behavior())
|
||||
RequestContextBuilder::from(self).attached_child()
|
||||
}
|
||||
|
||||
/// Use this function when you should be creating a child context using
|
||||
@@ -397,17 +463,10 @@ impl RequestContext {
|
||||
Self::new(task_kind, download_behavior)
|
||||
}
|
||||
|
||||
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn with_scope_timeline(&self, timeline: &Arc<Timeline>) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_timeline(timeline))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub(crate) fn with_scope_page_service_pagestream(
|
||||
@@ -416,9 +475,9 @@ impl RequestContext {
|
||||
crate::page_service::TenantManagerTypes,
|
||||
>,
|
||||
) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_page_service_pagestream(timeline_handle))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_secondary_timeline(
|
||||
@@ -426,28 +485,30 @@ impl RequestContext {
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_secondary_timeline(tenant_shard_id, timeline_id))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_secondary_tenant(&self, tenant_shard_id: &TenantShardId) -> Self {
|
||||
RequestContextBuilder::extend(self)
|
||||
RequestContextBuilder::from(self)
|
||||
.scope(Scope::new_secondary_tenant(tenant_shard_id))
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn with_scope_unit_test(&self) -> Self {
|
||||
RequestContextBuilder::new(TaskKind::UnitTest)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(TaskKind::UnitTest)
|
||||
.scope(Scope::new_unit_test())
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn with_scope_debug_tools(&self) -> Self {
|
||||
RequestContextBuilder::new(TaskKind::DebugTool)
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(TaskKind::DebugTool)
|
||||
.scope(Scope::new_debug_tools())
|
||||
.build()
|
||||
.attached_child()
|
||||
}
|
||||
|
||||
pub fn task_kind(&self) -> TaskKind {
|
||||
@@ -504,4 +565,48 @@ impl RequestContext {
|
||||
Scope::DebugTools { io_size_metrics } => io_size_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
|
||||
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
|
||||
span.inner().follows_from(from_span.inner());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_perf_span(&self) -> bool {
|
||||
self.perf_span.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
/// [`Future`] extension trait that allow for creating performance
|
||||
/// spans on sampled requests
|
||||
pub(crate) trait PerfInstrumentFutureExt<'a>: Future + Send {
|
||||
/// Instrument this future with a new performance span when the
|
||||
/// provided request context indicates the originator request
|
||||
/// was sampled. Otherwise, just box the future and return it as is.
|
||||
fn maybe_perf_instrument<Fn>(
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
make_span: Fn,
|
||||
) -> BoxFuture<'a, Self::Output>
|
||||
where
|
||||
Self: Sized + 'a,
|
||||
Fn: FnOnce(&Span) -> Span,
|
||||
{
|
||||
match &ctx.perf_span {
|
||||
Some(perf_span) => {
|
||||
assert!(ctx.perf_span_dispatch.is_some());
|
||||
let dispatcher = ctx.perf_span_dispatch.as_ref().unwrap();
|
||||
|
||||
let new_span =
|
||||
tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
|
||||
|
||||
let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
|
||||
self.instrument(new_perf_span).boxed()
|
||||
}
|
||||
None => self.boxed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Implement the trait for all types that satisfy the trait bounds
|
||||
impl<'a, T: Future + Send + 'a> PerfInstrumentFutureExt<'a> for T {}
|
||||
|
||||
@@ -74,8 +74,8 @@ use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
|
||||
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
|
||||
use crate::tenant::timeline::{
|
||||
CompactFlags, CompactOptions, CompactRequest, CompactionError, Timeline, WaitLsnTimeout,
|
||||
WaitLsnWaiter, import_pgdata,
|
||||
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
|
||||
WaitLsnTimeout, WaitLsnWaiter, import_pgdata,
|
||||
};
|
||||
use crate::tenant::{
|
||||
GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError,
|
||||
@@ -445,6 +445,9 @@ async fn build_timeline_info_common(
|
||||
|
||||
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
|
||||
|
||||
// Externally, expose the lowest LSN that can be used to create a branch.
|
||||
// Internally we distinguish between the planned GC cutoff (PITR point) and the "applied" GC cutoff (where we
|
||||
// actually trimmed data to), which can pass each other when PITR is changed.
|
||||
let min_readable_lsn = std::cmp::max(
|
||||
timeline.get_gc_cutoff_lsn(),
|
||||
*timeline.get_applied_gc_cutoff_lsn(),
|
||||
@@ -461,7 +464,6 @@ async fn build_timeline_info_common(
|
||||
initdb_lsn,
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
_unused: Default::default(), // Unused, for legacy decode only
|
||||
min_readable_lsn,
|
||||
applied_gc_cutoff_lsn: *timeline.get_applied_gc_cutoff_lsn(),
|
||||
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
|
||||
@@ -2335,21 +2337,31 @@ async fn timeline_compact_handler(
|
||||
}
|
||||
|
||||
async fn timeline_mark_invisible_handler(
|
||||
request: Request<Body>,
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let compact_request = json_request_maybe::<Option<MarkInvisibleRequest>>(&mut request).await?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let visibility = match compact_request {
|
||||
Some(req) => match req.is_visible {
|
||||
Some(true) => TimelineVisibilityState::Visible,
|
||||
Some(false) | None => TimelineVisibilityState::Invisible,
|
||||
},
|
||||
None => TimelineVisibilityState::Invisible,
|
||||
};
|
||||
|
||||
async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
timeline.remote_client.schedule_index_upload_for_timeline_invisible_state(TimelineVisibilityState::Invisible).map_err(ApiError::InternalServerError)?;
|
||||
timeline.remote_client.schedule_index_upload_for_timeline_invisible_state(visibility).map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_timeline_mark_invisible", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
@@ -2685,11 +2697,12 @@ async fn getpage_at_lsn_handler_inner(
|
||||
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
// Enable read path debugging
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true)
|
||||
.scope(context::Scope::new_timeline(&timeline)).build();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.read_path_debug(true)
|
||||
.root();
|
||||
|
||||
// Use last_record_lsn if no lsn is provided
|
||||
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
@@ -3176,7 +3189,8 @@ async fn list_aux_files(
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download)
|
||||
.with_scope_timeline(&timeline);
|
||||
let files = timeline
|
||||
.list_aux_files(body.lsn, &ctx, io_concurrency)
|
||||
.await?;
|
||||
@@ -3420,14 +3434,15 @@ async fn put_tenant_timeline_import_wal(
|
||||
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn);
|
||||
async move {
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::extend(&ctx).scope(context::Scope::new_timeline(&timeline)).build();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::MgmtRequest)
|
||||
.download_behavior(DownloadBehavior::Warn)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.root();
|
||||
|
||||
let mut body = StreamReader::new(request.into_body().map(|res| {
|
||||
res.map_err(|error| {
|
||||
|
||||
@@ -55,6 +55,9 @@ pub const DEFAULT_PG_VERSION: u32 = 16;
|
||||
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
||||
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
||||
|
||||
// Target used for performance traces.
|
||||
pub const PERF_TRACE_TARGET: &str = "P";
|
||||
|
||||
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
@@ -1248,13 +1248,13 @@ pub(crate) static STORAGE_IO_TIME_METRIC: Lazy<StorageIoTime> = Lazy::new(Storag
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(usize)]
|
||||
enum StorageIoSizeOperation {
|
||||
pub(crate) enum StorageIoSizeOperation {
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
|
||||
impl StorageIoSizeOperation {
|
||||
const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
pub(crate) const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
|
||||
fn as_str(&self) -> &'static str {
|
||||
Self::VARIANTS[*self as usize]
|
||||
@@ -1262,7 +1262,7 @@ impl StorageIoSizeOperation {
|
||||
}
|
||||
|
||||
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
|
||||
static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
pub(crate) static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"Total amount of bytes read/written in IO operations",
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{io, str};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
@@ -53,7 +54,9 @@ use utils::sync::spsc_fold;
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
|
||||
TimelineMetrics,
|
||||
@@ -100,6 +103,7 @@ pub fn spawn(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
pg_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
tcp_listener: tokio::net::TcpListener,
|
||||
) -> Listener {
|
||||
let cancel = CancellationToken::new();
|
||||
@@ -117,6 +121,7 @@ pub fn spawn(
|
||||
conf,
|
||||
tenant_manager,
|
||||
pg_auth,
|
||||
perf_trace_dispatch,
|
||||
tcp_listener,
|
||||
conf.pg_auth_type,
|
||||
conf.page_service_pipelining.clone(),
|
||||
@@ -173,6 +178,7 @@ pub async fn libpq_listener_main(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: tokio::net::TcpListener,
|
||||
auth_type: AuthType,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
@@ -205,8 +211,12 @@ pub async fn libpq_listener_main(
|
||||
// Connection established. Spawn a new task to handle it.
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let local_auth = auth.clone();
|
||||
let connection_ctx = listener_ctx
|
||||
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
|
||||
let connection_ctx = RequestContextBuilder::from(&listener_ctx)
|
||||
.task_kind(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.perf_span_dispatch(perf_trace_dispatch.clone())
|
||||
.detached_child();
|
||||
|
||||
connection_handler_tasks.spawn(page_service_conn_main(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
@@ -607,6 +617,7 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -902,10 +913,12 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard = match timeline_handles
|
||||
|
||||
let res = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
|
||||
let shard = match res {
|
||||
Ok(tl) => tl,
|
||||
Err(e) => {
|
||||
let span = mkspan!(before shard routing);
|
||||
@@ -932,6 +945,38 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = if shard.is_get_page_request_sampled() {
|
||||
RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_PAGE",
|
||||
tenant_id = %tenant_id,
|
||||
shard_id = %shard.get_shard_identity().shard_slug(),
|
||||
timeline_id = %timeline_id,
|
||||
lsn = %req.hdr.request_lsn,
|
||||
request_id = %req.hdr.reqid,
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child()
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
// This ctx travels as part of the BatchedFeMessage through
|
||||
// batching into the request handler.
|
||||
// The request handler needs to do some per-request work
|
||||
// (relsize check) before dispatching the batch as a single
|
||||
// get_vectored call to the Timeline.
|
||||
// This ctx will be used for the reslize check, whereas the
|
||||
// get_vectored call will be a different ctx with separate
|
||||
// perf span.
|
||||
let ctx = ctx.with_scope_page_service_pagestream(&shard);
|
||||
|
||||
// Similar game for this `span`: we funnel it through so that
|
||||
// request handler log messages contain the request-specific fields.
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
@@ -939,19 +984,34 @@ impl PageServerHandler {
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"THROTTLE",
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
let res = Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
&ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
.await
|
||||
{
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"WAIT_LSN",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let effective_request_lsn = match res {
|
||||
Ok(lsn) => lsn,
|
||||
Err(e) => {
|
||||
return respond_error!(span, e);
|
||||
@@ -961,7 +1021,7 @@ impl PageServerHandler {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, ctx }],
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1514,6 +1574,7 @@ impl PageServerHandler {
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
let err = loop {
|
||||
let msg = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
@@ -2004,7 +2065,9 @@ impl PageServerHandler {
|
||||
|
||||
let results = timeline
|
||||
.get_rel_page_at_lsn_batched(
|
||||
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
|
||||
requests
|
||||
.iter()
|
||||
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
use std::ops::{ControlFlow, Range};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, ensure};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
@@ -31,7 +32,7 @@ use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, info, info_span, trace, warn};
|
||||
use utils::bin_ser::{BeSer, DeserializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
@@ -39,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
@@ -209,7 +210,9 @@ impl Timeline {
|
||||
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(
|
||||
pages.iter().map(|(tag, blknum)| (tag, blknum)),
|
||||
pages
|
||||
.iter()
|
||||
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
|
||||
effective_lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
@@ -248,7 +251,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
|
||||
effective_lsn: Lsn,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
@@ -262,8 +265,11 @@ impl Timeline {
|
||||
let mut result = Vec::with_capacity(pages.len());
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
|
||||
for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
|
||||
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
||||
BTreeMap::default();
|
||||
|
||||
let mut perf_instrument = false;
|
||||
for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -274,7 +280,16 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
|
||||
.get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%effective_lsn,
|
||||
)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
@@ -297,8 +312,12 @@ impl Timeline {
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
if ctx.has_perf_span() {
|
||||
perf_instrument = true;
|
||||
}
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push(response_slot_idx);
|
||||
key_slots.push((response_slot_idx, ctx));
|
||||
}
|
||||
|
||||
let keyspace = {
|
||||
@@ -314,16 +333,34 @@ impl Timeline {
|
||||
acc.to_keyspace()
|
||||
};
|
||||
|
||||
match self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
|
||||
.await
|
||||
{
|
||||
let ctx = match perf_instrument {
|
||||
true => RequestContextBuilder::from(ctx)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"GET_VECTORED",
|
||||
tenant_id = %self.tenant_shard_id.tenant_id,
|
||||
timeline_id = %self.timeline_id,
|
||||
lsn = %effective_lsn,
|
||||
shard = %self.tenant_shard_id.shard_slug(),
|
||||
)
|
||||
})
|
||||
.attached_child(),
|
||||
false => ctx.attached_child(),
|
||||
};
|
||||
|
||||
let res = self
|
||||
.get_vectored(keyspace, effective_lsn, io_concurrency, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||
let first_slot = key_slots.next().unwrap();
|
||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for slot in key_slots {
|
||||
for (slot, req_ctx) in key_slots {
|
||||
let clone = match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
Err(err) => Err(match err {
|
||||
@@ -341,17 +378,22 @@ impl Timeline {
|
||||
};
|
||||
|
||||
result_slots[slot].write(clone);
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(&ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(&ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||
for slot in keys_slots.values().flatten() {
|
||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
let err = match &err {
|
||||
@@ -383,6 +425,7 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
req_ctx.perf_follows_from(&ctx);
|
||||
result_slots[*slot].write(err);
|
||||
}
|
||||
|
||||
|
||||
@@ -219,8 +219,7 @@ pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
|
||||
// Bump this number when adding a new pageserver_runtime!
|
||||
// SAFETY: it's obviously correct
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = NonZeroUsize::new(4).unwrap();
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
@@ -3248,17 +3248,23 @@ impl Tenant {
|
||||
async fn housekeeping(&self) {
|
||||
// Call through to all timelines to freeze ephemeral layers as needed. This usually happens
|
||||
// during ingest, but we don't want idle timelines to hold open layers for too long.
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tli| tli.is_active())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
//
|
||||
// We don't do this if the tenant can't upload layers (i.e. it's in stale attachment mode).
|
||||
// We don't run compaction in this case either, and don't want to keep flushing tiny L0
|
||||
// layers that won't be compacted down.
|
||||
if self.tenant_conf.load().location.may_upload_layers_hint() {
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tli| tli.is_active())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
|
||||
for timeline in timelines {
|
||||
timeline.maybe_freeze_ephemeral_layer().await;
|
||||
for timeline in timelines {
|
||||
timeline.maybe_freeze_ephemeral_layer().await;
|
||||
}
|
||||
}
|
||||
|
||||
// Shut down walredo if idle.
|
||||
@@ -3683,7 +3689,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
TenantState::Active { .. } => {
|
||||
TenantState::Active => {
|
||||
return Ok(());
|
||||
}
|
||||
TenantState::Broken { reason, .. } => {
|
||||
@@ -4199,9 +4205,9 @@ impl Tenant {
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
|
||||
let timeline_ctx = RequestContextBuilder::extend(ctx)
|
||||
let timeline_ctx = RequestContextBuilder::from(ctx)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.build();
|
||||
.detached_child();
|
||||
|
||||
Ok((timeline, timeline_ctx))
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
///
|
||||
/// Complexity: O(log N)
|
||||
fn add_node(&mut self, key: i128) {
|
||||
let value = match self.nodes.range(..=key).last() {
|
||||
let value = match self.nodes.range(..=key).next_back() {
|
||||
Some((_, Some(v))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
|
||||
@@ -58,7 +58,7 @@ use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
/// - `Attached`: has a full Tenant object, is elegible to service
|
||||
/// reads and ingest WAL.
|
||||
/// reads and ingest WAL.
|
||||
/// - `Secondary`: is only keeping a local cache warm.
|
||||
///
|
||||
/// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
|
||||
|
||||
@@ -130,7 +130,7 @@ impl IndexPart {
|
||||
/// Version history
|
||||
/// - 2: added `deleted_at`
|
||||
/// - 3: no longer deserialize `timeline_layers` (serialized format is the same, but timeline_layers
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// - 4: timeline_layers is fully removed.
|
||||
/// - 5: lineage was added
|
||||
/// - 6: last_aux_file_policy is added.
|
||||
|
||||
@@ -167,10 +167,17 @@ impl SecondaryTenant {
|
||||
|
||||
self.validate_metrics();
|
||||
|
||||
// Metrics are subtracted from and/or removed eagerly.
|
||||
// Deletions are done in the background via [`BackgroundPurges::spawn`].
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
|
||||
self.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.drain_timelines(&self.tenant_shard_id, &self.resident_size_metric);
|
||||
}
|
||||
|
||||
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::metrics::{STORAGE_IO_SIZE, StorageIoSizeOperation};
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
@@ -124,15 +125,53 @@ impl OnDiskState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(super) struct SecondaryDetailTimeline {
|
||||
on_disk_layers: HashMap<LayerName, OnDiskState>,
|
||||
|
||||
/// We remember when layers were evicted, to prevent re-downloading them.
|
||||
pub(super) evicted_at: HashMap<LayerName, SystemTime>,
|
||||
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
impl Clone for SecondaryDetailTimeline {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
on_disk_layers: self.on_disk_layers.clone(),
|
||||
evicted_at: self.evicted_at.clone(),
|
||||
// This is a bit awkward. The downloader code operates on a snapshot
|
||||
// of the secondary list to avoid locking it for extended periods of time.
|
||||
// No particularly strong reason to chose [`RequestContext::detached_child`],
|
||||
// but makes more sense than [`RequestContext::attached_child`].
|
||||
ctx: self
|
||||
.ctx
|
||||
.detached_child(self.ctx.task_kind(), self.ctx.download_behavior()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SecondaryDetailTimeline {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SecondaryDetailTimeline")
|
||||
.field("on_disk_layers", &self.on_disk_layers)
|
||||
.field("evicted_at", &self.evicted_at)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SecondaryDetailTimeline {
|
||||
pub(super) fn empty(ctx: RequestContext) -> Self {
|
||||
SecondaryDetailTimeline {
|
||||
on_disk_layers: Default::default(),
|
||||
evicted_at: Default::default(),
|
||||
ctx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn context(&self) -> &RequestContext {
|
||||
&self.ctx
|
||||
}
|
||||
|
||||
pub(super) fn remove_layer(
|
||||
&mut self,
|
||||
name: &LayerName,
|
||||
@@ -258,18 +297,50 @@ impl SecondaryDetail {
|
||||
|
||||
pub(super) fn remove_timeline(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
let removed = self.timelines.remove(timeline_id);
|
||||
if let Some(removed) = removed {
|
||||
resident_metric.sub(
|
||||
removed
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
Self::clear_timeline_metrics(tenant_shard_id, timeline_id, removed, resident_metric);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn drain_timelines(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
for (timeline_id, removed) in self.timelines.drain() {
|
||||
Self::clear_timeline_metrics(tenant_shard_id, &timeline_id, removed, resident_metric);
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_timeline_metrics(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
detail: SecondaryDetailTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
resident_metric.sub(
|
||||
detail
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
for op in StorageIoSizeOperation::VARIANTS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[
|
||||
op,
|
||||
tenant_id.as_str(),
|
||||
shard_id.as_str(),
|
||||
timeline_id.as_str(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,6 +798,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
last_heatmap,
|
||||
timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -774,7 +846,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
let ctx = &ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline.timeline_id);
|
||||
let timeline_state = timeline_states
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("Just populated above");
|
||||
@@ -917,7 +988,11 @@ impl<'a> TenantDownloader<'a> {
|
||||
for delete_timeline in &delete_timelines {
|
||||
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
|
||||
// from disk fails that will be a fatal error.
|
||||
detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric);
|
||||
detail.remove_timeline(
|
||||
self.secondary_state.get_tenant_shard_id(),
|
||||
delete_timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1013,7 +1088,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
timeline: HeatMapTimeline,
|
||||
timeline_state: SecondaryDetailTimeline,
|
||||
deadline: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
@@ -1044,7 +1118,12 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline_id, layer, ctx)
|
||||
.download_layer(
|
||||
tenant_shard_id,
|
||||
&timeline_id,
|
||||
layer,
|
||||
timeline_state.context(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
@@ -1155,13 +1234,16 @@ impl<'a> TenantDownloader<'a> {
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
|
||||
|
||||
let (result, touched) = self
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline)
|
||||
.await;
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
|
||||
{
|
||||
let mut detail = self.secondary_state.detail.lock().unwrap();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_insert_with(|| {
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline_id);
|
||||
SecondaryDetailTimeline::empty(ctx)
|
||||
});
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
touched.into_iter().for_each(|t| {
|
||||
@@ -1295,10 +1377,12 @@ async fn init_timeline_state(
|
||||
last_heatmap: Option<&HeatMapTimeline>,
|
||||
heatmap: &HeatMapTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
ctx: &RequestContext,
|
||||
) -> SecondaryDetailTimeline {
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::default();
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::empty(ctx);
|
||||
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
|
||||
@@ -13,13 +13,13 @@ pub mod merge_iterator;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::future::Future;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
|
||||
use bytes::Bytes;
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
@@ -34,7 +34,7 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::{Instrument, trace};
|
||||
use tracing::{Instrument, info_span, trace};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
@@ -43,7 +43,9 @@ use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
@@ -874,13 +876,37 @@ impl ReadableLayer {
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_LAYER",
|
||||
layer = %layer
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,9 +896,9 @@ impl DeltaLayerInner {
|
||||
where
|
||||
Reader: BlockReader + Clone,
|
||||
{
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
@@ -1105,9 +1105,9 @@ impl DeltaLayerInner {
|
||||
all_keys.push(entry);
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
&RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
.attached_child(),
|
||||
)
|
||||
.await?;
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
|
||||
@@ -481,9 +481,9 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
@@ -421,9 +421,9 @@ impl InMemoryLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
|
||||
@@ -3,12 +3,13 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use tracing::Instrument;
|
||||
use tracing::{Instrument, info_span};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -18,7 +19,7 @@ use super::delta_layer::{self};
|
||||
use super::image_layer::{self};
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
|
||||
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
@@ -324,16 +325,29 @@ impl Layer {
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let downloaded =
|
||||
let downloaded = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.0
|
||||
.get_or_maybe_download(true, ctx)
|
||||
.get_or_maybe_download(true, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
}
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
})?
|
||||
};
|
||||
|
||||
let this = ResidentLayer {
|
||||
downloaded: downloaded.clone(),
|
||||
owner: self.clone(),
|
||||
@@ -341,9 +355,20 @@ impl Layer {
|
||||
|
||||
self.record_access(ctx);
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"VISIT_LAYER",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
downloaded
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
|
||||
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, &ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
GetVectoredError::Other(err) => GetVectoredError::Other(
|
||||
@@ -1045,15 +1070,34 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
|
||||
let ctx = if ctx.has_perf_span() {
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::LayerDownload)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason
|
||||
)
|
||||
})
|
||||
.detached_child();
|
||||
ctx.perf_follows_from(&dl_ctx);
|
||||
dl_ctx
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, download_ctx)
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
Ok(res)
|
||||
}
|
||||
@@ -1720,9 +1764,9 @@ impl DownloadedLayer {
|
||||
);
|
||||
|
||||
let res = if owner.desc.is_delta {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
|
||||
.build();
|
||||
.attached_child();
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
@@ -1738,9 +1782,9 @@ impl DownloadedLayer {
|
||||
.await
|
||||
.map(LayerKind::Delta)
|
||||
} else {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
|
||||
.build();
|
||||
.attached_child();
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -119,6 +119,10 @@ async fn smoke_test() {
|
||||
let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
|
||||
assert!(matches!(e, EvictionError::NotFound));
|
||||
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.attached_child();
|
||||
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValuesReconstructState::new(io_concurrency.clone());
|
||||
@@ -127,7 +131,7 @@ async fn smoke_test() {
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
ctx,
|
||||
&dl_ctx,
|
||||
)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
@@ -177,7 +181,7 @@ async fn smoke_test() {
|
||||
|
||||
// plain downloading is rarely needed
|
||||
layer
|
||||
.download_and_keep_resident(ctx)
|
||||
.download_and_keep_resident(&dl_ctx)
|
||||
.instrument(download_span)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -645,9 +649,10 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
|
||||
let ctx = ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
let layers = timeline.layers.read().await;
|
||||
@@ -730,9 +735,9 @@ async fn evict_and_wait_does_not_wait_for_download() {
|
||||
let ctx = ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// This test does downloads
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, Result, anyhow, bail, ensure};
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
@@ -96,7 +97,9 @@ use super::{
|
||||
};
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
@@ -895,6 +898,12 @@ pub(crate) struct CompactRequest {
|
||||
pub sub_compaction_max_job_size_mb: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct MarkInvisibleRequest {
|
||||
#[serde(default)]
|
||||
pub is_visible: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct CompactOptions {
|
||||
pub flags: EnumSet<CompactFlags>,
|
||||
@@ -1283,9 +1292,22 @@ impl Timeline {
|
||||
};
|
||||
reconstruct_state.read_path = read_path;
|
||||
|
||||
let traversal_res: Result<(), _> = self
|
||||
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
.await;
|
||||
let traversal_res: Result<(), _> = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
};
|
||||
|
||||
if let Err(err) = traversal_res {
|
||||
// Wait for all the spawned IOs to complete.
|
||||
// See comments on `spawn_io` inside `storage_layer` for more details.
|
||||
@@ -1299,14 +1321,46 @@ impl Timeline {
|
||||
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT",
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"RECONSTRUCT_KEY",
|
||||
key = %key,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
async move {
|
||||
assert_eq!(state.situation, ValueReconstructSituation::Complete);
|
||||
|
||||
let converted = match state.collect_pending_ios().await {
|
||||
let res = state
|
||||
.collect_pending_ios()
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WAIT_FOR_IO_COMPLETIONS",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let converted = match res {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
@@ -1323,16 +1377,27 @@ impl Timeline {
|
||||
"{converted:?}"
|
||||
);
|
||||
|
||||
(
|
||||
key,
|
||||
walredo_self.reconstruct_value(key, lsn, converted).await,
|
||||
)
|
||||
let walredo_deltas = converted.num_deltas();
|
||||
let walredo_res = walredo_self
|
||||
.reconstruct_value(key, lsn, converted)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"WALREDO",
|
||||
deltas = %walredo_deltas,
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
(key, walredo_res)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs
|
||||
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await;
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
@@ -2241,7 +2306,7 @@ impl Timeline {
|
||||
.await
|
||||
.expect("holding a reference to self");
|
||||
}
|
||||
TimelineState::Active { .. } => {
|
||||
TimelineState::Active => {
|
||||
return Ok(());
|
||||
}
|
||||
TimelineState::Broken { .. } | TimelineState::Stopping => {
|
||||
@@ -2411,6 +2476,31 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
|
||||
}
|
||||
|
||||
/// Checks if a get page request should get perf tracing
|
||||
///
|
||||
/// The configuration priority is: tenant config override, default tenant config,
|
||||
/// pageserver config.
|
||||
pub(crate) fn is_get_page_request_sampled(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
let ratio = tenant_conf
|
||||
.tenant_conf
|
||||
.sampling_ratio
|
||||
.flatten()
|
||||
.or(self.conf.default_tenant_conf.sampling_ratio)
|
||||
.or(self.conf.tracing.as_ref().map(|t| t.sampling_ratio));
|
||||
|
||||
match ratio {
|
||||
Some(r) => {
|
||||
if r.numerator == 0 {
|
||||
false
|
||||
} else {
|
||||
rand::thread_rng().gen_range(0..r.denominator) < r.numerator
|
||||
}
|
||||
}
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -3869,15 +3959,30 @@ impl Timeline {
|
||||
let TimelineVisitOutcome {
|
||||
completed_keyspace: completed,
|
||||
image_covered_keyspace,
|
||||
} = Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"PLAN_IO_TIMELINE",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
Self::get_vectored_reconstruct_data_timeline(
|
||||
timeline,
|
||||
keyspace.clone(),
|
||||
cont_lsn,
|
||||
reconstruct_state,
|
||||
&self.cancel,
|
||||
&ctx,
|
||||
)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?
|
||||
};
|
||||
|
||||
keyspace.remove_overlapping_with(&completed);
|
||||
|
||||
@@ -3921,8 +4026,24 @@ impl Timeline {
|
||||
|
||||
// Take the min to avoid reconstructing a page with data newer than request Lsn.
|
||||
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
|
||||
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_ANCESTOR",
|
||||
timeline = %timeline.timeline_id,
|
||||
lsn = %cont_lsn,
|
||||
ancestor = %ancestor_timeline.timeline_id,
|
||||
ancestor_lsn = %timeline.ancestor_lsn
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
timeline_owned = timeline
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await?;
|
||||
timeline = &*timeline_owned;
|
||||
};
|
||||
@@ -7253,9 +7374,9 @@ mod tests {
|
||||
|
||||
eprintln!("Downloading {layer} and re-generating heatmap");
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
let ctx = &RequestContextBuilder::from(ctx)
|
||||
.download_behavior(crate::context::DownloadBehavior::Download)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let _resident = layer
|
||||
.download_and_keep_resident(ctx)
|
||||
|
||||
@@ -26,7 +26,7 @@ use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::{KeySpace, ShardedRange};
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
@@ -61,7 +61,7 @@ use crate::tenant::timeline::{
|
||||
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
|
||||
ResidentLayer, drop_rlock,
|
||||
};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded, gc_block};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
@@ -123,7 +123,6 @@ impl GcCompactionQueueItem {
|
||||
#[derive(Default)]
|
||||
struct GcCompactionGuardItems {
|
||||
notify: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
gc_guard: Option<gc_block::Guard>,
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
@@ -279,7 +278,7 @@ impl GcCompactionQueue {
|
||||
gc_compaction_ratio_percent: u64,
|
||||
) -> bool {
|
||||
const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
|
||||
if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT {
|
||||
if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
|
||||
// Do not auto-trigger when physical size >= 150GB
|
||||
return false;
|
||||
}
|
||||
@@ -319,7 +318,12 @@ impl GcCompactionQueue {
|
||||
flags
|
||||
},
|
||||
sub_compaction: true,
|
||||
compact_key_range: None,
|
||||
// Only auto-trigger gc-compaction over the data keyspace due to concerns in
|
||||
// https://github.com/neondatabase/neon/issues/11318.
|
||||
compact_key_range: Some(CompactKeyRange {
|
||||
start: Key::MIN,
|
||||
end: Key::metadata_key_range().start,
|
||||
}),
|
||||
compact_lsn_range: None,
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
},
|
||||
@@ -343,44 +347,45 @@ impl GcCompactionQueue {
|
||||
info!("compaction job id={} finished", id);
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some(items) = guard.guards.remove(&id) {
|
||||
drop(items.gc_guard);
|
||||
if let Some(tx) = items.notify {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_running_job(&self) {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
|
||||
async fn handle_sub_compaction(
|
||||
&self,
|
||||
id: GcCompactionJobId,
|
||||
options: CompactOptions,
|
||||
timeline: &Arc<Timeline>,
|
||||
gc_block: &GcBlock,
|
||||
auto: bool,
|
||||
) -> Result<(), CompactionError> {
|
||||
info!(
|
||||
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
);
|
||||
let jobs = timeline
|
||||
let res = timeline
|
||||
.gc_compaction_split_jobs(
|
||||
GcCompactJob::from_compact_options(options.clone()),
|
||||
options.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await?;
|
||||
.await;
|
||||
let jobs = match res {
|
||||
Ok(jobs) => jobs,
|
||||
Err(err) => {
|
||||
warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
|
||||
self.notify_and_unblock(id);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
self.notify_and_unblock(id);
|
||||
} else {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let jobs_len = jobs.len();
|
||||
let mut pending_tasks = Vec::new();
|
||||
// gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
|
||||
@@ -415,7 +420,6 @@ impl GcCompactionQueue {
|
||||
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
let mut tasks = Vec::new();
|
||||
for task in pending_tasks {
|
||||
let id = guard.next_id();
|
||||
@@ -446,7 +450,18 @@ impl GcCompactionQueue {
|
||||
if let Err(err) = &res {
|
||||
log_compaction_error(err, None, cancel.is_cancelled());
|
||||
}
|
||||
res
|
||||
match res {
|
||||
Ok(res) => Ok(res),
|
||||
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
|
||||
Err(_) => {
|
||||
// There are some cases where traditional gc might collect some layer
|
||||
// files causing gc-compaction cannot read the full history of the key.
|
||||
// This needs to be resolved in the long-term by improving the compaction
|
||||
// process. For now, let's simply avoid such errors triggering the
|
||||
// circuit breaker.
|
||||
Ok(CompactionOutcome::Skipped)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn iteration_inner(
|
||||
@@ -494,27 +509,32 @@ impl GcCompactionQueue {
|
||||
info!(
|
||||
"running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
|
||||
);
|
||||
self.handle_sub_compaction(id, options, timeline, gc_block, auto)
|
||||
self.handle_sub_compaction(id, options, timeline, auto)
|
||||
.await?;
|
||||
} else {
|
||||
// Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
|
||||
// in this branch.
|
||||
let gc_guard = match gc_block.start().await {
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
self.notify_and_unblock(id);
|
||||
self.clear_running_job();
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
|
||||
}
|
||||
let compaction_result =
|
||||
timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
self.notify_and_unblock(id);
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(%err, "failed to run gc-compaction");
|
||||
self.notify_and_unblock(id);
|
||||
self.clear_running_job();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
yield_for_l0 = true;
|
||||
}
|
||||
@@ -522,7 +542,25 @@ impl GcCompactionQueue {
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let compaction_result = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
self.clear_running_job();
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(%err, "failed to run gc-compaction subcompaction job");
|
||||
self.clear_running_job();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if compaction_result == CompactionOutcome::YieldForL0 {
|
||||
// We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
|
||||
// again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
|
||||
@@ -553,10 +591,7 @@ impl GcCompactionQueue {
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
self.clear_running_job();
|
||||
Ok(if yield_for_l0 {
|
||||
tracing::info!("give up gc-compaction: yield for L0 compaction");
|
||||
CompactionOutcome::YieldForL0
|
||||
@@ -1001,9 +1036,9 @@ impl Timeline {
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
.attached_child();
|
||||
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
|
||||
@@ -2,10 +2,14 @@ use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::DetachBehavior;
|
||||
use pageserver_api::models::detach_ancestor::AncestorDetached;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
@@ -22,7 +26,10 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Tenant;
|
||||
use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
|
||||
use crate::tenant::storage_layer::layer::local_layer_path;
|
||||
use crate::tenant::storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer};
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc as _, DeltaLayerWriter, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -170,6 +177,92 @@ impl Attempt {
|
||||
}
|
||||
}
|
||||
|
||||
async fn generate_tombstone_image_layer(
|
||||
detached: &Arc<Timeline>,
|
||||
ancestor: &Arc<Timeline>,
|
||||
ancestor_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<ResidentLayer>, Error> {
|
||||
tracing::info!(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
// Directly use `get_vectored_impl` to skip the max_vectored_read_key limit check. Note that the keyspace should
|
||||
// not contain too many keys, otherwise this takes a lot of memory. Currently we limit it to 10k keys in the compute.
|
||||
let key_range = Key::sparse_non_inherited_keyspace();
|
||||
// avoid generating a "future layer" which will then be removed
|
||||
let image_lsn = ancestor_lsn;
|
||||
|
||||
{
|
||||
let layers = detached.layers.read().await;
|
||||
for layer in layers.all_persistent_layers() {
|
||||
if !layer.is_delta
|
||||
&& layer.lsn_range.start == image_lsn
|
||||
&& overlaps_with(&key_range, &layer.key_range)
|
||||
{
|
||||
tracing::warn!(
|
||||
layer=%layer, "image layer at the detach LSN already exists, skipping removing aux files"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let data = ancestor
|
||||
.get_vectored_impl(
|
||||
KeySpace::single(key_range.clone()),
|
||||
image_lsn,
|
||||
&mut reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to retrieve aux keys")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
if !data.is_empty() {
|
||||
// TODO: is it possible that we can have an image at `image_lsn`? Unlikely because image layers are only generated
|
||||
// upon compaction but theoretically possible.
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
detached.conf,
|
||||
detached.timeline_id,
|
||||
detached.tenant_shard_id,
|
||||
&key_range,
|
||||
image_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to create image layer writer")
|
||||
.map_err(Error::Prepare)?;
|
||||
for key in data.keys() {
|
||||
image_layer_writer
|
||||
.put_image(*key, Bytes::new(), ctx)
|
||||
.await
|
||||
.context("failed to write key")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
}
|
||||
let (desc, path) = image_layer_writer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.context("failed to finish image layer writer for removing the metadata keys")
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
let generated = Layer::finish_creating(detached.conf, detached, desc, &path)
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
detached
|
||||
.remote_client
|
||||
.upload_layer_file(&generated, &detached.cancel)
|
||||
.await
|
||||
.map_err(|e| Error::launder(e, Error::Prepare))?;
|
||||
tracing::info!(layer=%generated, "wrote image layer");
|
||||
Ok(Some(generated))
|
||||
} else {
|
||||
tracing::info!("no aux keys found in ancestor");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`Timeline::prepare_to_detach_from_ancestor`]
|
||||
pub(super) async fn prepare(
|
||||
detached: &Arc<Timeline>,
|
||||
@@ -352,10 +445,16 @@ pub(super) async fn prepare(
|
||||
|
||||
// TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
|
||||
let mut new_layers: Vec<Layer> =
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
|
||||
|
||||
if let Some(tombstone_layer) =
|
||||
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, ctx).await?
|
||||
{
|
||||
new_layers.push(tombstone_layer.into());
|
||||
}
|
||||
|
||||
{
|
||||
tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
|
||||
tracing::info!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
|
||||
|
||||
let mut tasks = tokio::task::JoinSet::new();
|
||||
|
||||
|
||||
@@ -32,9 +32,15 @@ impl Client {
|
||||
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
|
||||
anyhow::bail!("import_pgdata_upcall_api is not configured")
|
||||
};
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
Ok(Self {
|
||||
base_url: base_url.to_string(),
|
||||
client: reqwest::Client::new(),
|
||||
client: http_client,
|
||||
cancel,
|
||||
authorization_header: conf
|
||||
.import_pgdata_upcall_api_token
|
||||
|
||||
@@ -25,8 +25,8 @@ impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
AlignedBufferMut {
|
||||
raw: RawAlignedBuffer::with_capacity(capacity),
|
||||
|
||||
@@ -37,8 +37,8 @@ impl<const A: usize> RawAlignedBuffer<ConstAlign<A>> {
|
||||
/// * `align` must be a power of two,
|
||||
///
|
||||
/// * `capacity`, when rounded up to the nearest multiple of `align`,
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
/// must not overflow isize (i.e., the rounded value must be
|
||||
/// less than or equal to `isize::MAX`).
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
let align = ConstAlign::<A>;
|
||||
let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout");
|
||||
|
||||
18
poetry.lock
generated
18
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -1286,24 +1286,20 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "4.1.0"
|
||||
version = "4.2.0"
|
||||
description = "Pure-Python HTTP/2 protocol implementation"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = []
|
||||
develop = false
|
||||
files = [
|
||||
{file = "h2-4.2.0-py3-none-any.whl", hash = "sha256:479a53ad425bb29af087f3458a61d30780bc818e4ebcf01f0b536ba916462ed0"},
|
||||
{file = "h2-4.2.0.tar.gz", hash = "sha256:c8a52129695e88b1a0578d8d2cc6842bbd79128ac685463b887ee278126ad01f"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
hpack = ">=4.1,<5"
|
||||
hyperframe = ">=6.1,<7"
|
||||
|
||||
[package.source]
|
||||
type = "git"
|
||||
url = "https://github.com/python-hyper/h2"
|
||||
reference = "HEAD"
|
||||
resolved_reference = "0b98b244b5fd1fe96100ac14905417a3b70a4286"
|
||||
|
||||
[[package]]
|
||||
name = "hpack"
|
||||
version = "4.1.0"
|
||||
@@ -3844,4 +3840,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"
|
||||
content-hash = "7ab1e7b975af34b3271b7c6018fa22a261d3f73c7c0a0403b6b2bb86b5fbd36e"
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
pub(crate) const EXT_VERSION: &str = "0.2.0";
|
||||
pub(crate) const EXT_VERSION: &str = "0.3.0";
|
||||
pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -43,7 +43,7 @@ websockets = "^12.0"
|
||||
clickhouse-connect = "^0.7.16"
|
||||
kafka-python = "^2.0.2"
|
||||
jwcrypto = "^1.5.6"
|
||||
h2 = {git = "https://github.com/python-hyper/h2"}
|
||||
h2 = "^4.2.0"
|
||||
types-jwcrypto = "^1.5.0.20240925"
|
||||
pyyaml = "^6.0.2"
|
||||
types-pyyaml = "^6.0.12.20240917"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.85.0"
|
||||
channel = "1.86.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -115,13 +115,17 @@ impl Client {
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
@@ -197,6 +201,16 @@ impl Client {
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
self.request_maybe_body(method, uri, Some(body)).await
|
||||
}
|
||||
|
||||
/// Send the request and check that the status code is good, with an optional body.
|
||||
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
@@ -208,12 +222,15 @@ impl Client {
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
|
||||
}
|
||||
req.json(&body).send().await.map_err(Error::ReceiveBody)
|
||||
if let Some(body) = body {
|
||||
req = req.json(&body);
|
||||
}
|
||||
req.send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,10 @@ struct Args {
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
pub ssl_ca_file: Option<Utf8PathBuf>,
|
||||
/// Flag to use https for requests to peer's safekeeper API.
|
||||
#[arg(long)]
|
||||
pub use_https_safekeeper_api: bool,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -399,6 +402,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
ssl_cert_file: args.ssl_cert_file,
|
||||
ssl_cert_reload_period: args.ssl_cert_reload_period,
|
||||
ssl_ca_certs,
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
});
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
|
||||
@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
|
||||
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
|
||||
TimelineTermBumpRequest,
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
|
||||
TimelineStatus, TimelineTermBumpRequest,
|
||||
};
|
||||
use safekeeper_api::{ServerInfo, membership, models};
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
|
||||
@@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.delete_all_for_tenant(&tenant_id, action)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>(),
|
||||
)
|
||||
let response_body: TenantDeleteResult = delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>();
|
||||
json_response(StatusCode::OK, response_body)
|
||||
}
|
||||
|
||||
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -538,6 +536,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
https_connstr: sk_info.https_connstr,
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
|
||||
@@ -121,6 +121,7 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -170,6 +171,7 @@ impl SafeKeeperConf {
|
||||
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,10 +94,10 @@ impl WalReceivers {
|
||||
|
||||
/// Get reference to locked slot contents. Slot must exist (registered
|
||||
/// earlier).
|
||||
fn get_slot<'a>(
|
||||
self: &'a Arc<WalReceivers>,
|
||||
fn get_slot(
|
||||
self: &Arc<WalReceivers>,
|
||||
id: WalReceiverId,
|
||||
) -> MappedMutexGuard<'a, WalReceiverState> {
|
||||
) -> MappedMutexGuard<'_, WalReceiverState> {
|
||||
MutexGuard::map(self.mutex.lock(), |locked| {
|
||||
locked.slots[id]
|
||||
.as_mut()
|
||||
|
||||
@@ -176,6 +176,7 @@ pub struct Donor {
|
||||
pub flush_lsn: Lsn,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
pub https_connstr: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&PeerInfo> for Donor {
|
||||
@@ -186,6 +187,7 @@ impl From<&PeerInfo> for Donor {
|
||||
flush_lsn: p.flush_lsn,
|
||||
pg_connstr: p.pg_connstr.clone(),
|
||||
http_connstr: p.http_connstr.clone(),
|
||||
https_connstr: p.https_connstr.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -236,11 +238,33 @@ async fn recover(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// Learn donor term switch history to figure out starting point.
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let mut client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
.context("Failed to build http client for recover")?;
|
||||
|
||||
let url = if conf.use_https_safekeeper_api {
|
||||
if let Some(https_connstr) = donor.https_connstr.as_ref() {
|
||||
format!("https://{https_connstr}")
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"cannot recover from donor {}: \
|
||||
https is enabled, but https_connstr is not specified",
|
||||
donor.sk_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
format!("http://{}", donor.http_connstr)
|
||||
};
|
||||
|
||||
let timeline_info: TimelineStatus = client
|
||||
.get(format!(
|
||||
"http://{}/v1/tenant/{}/timeline/{}",
|
||||
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
url, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
|
||||
@@ -50,6 +50,7 @@ fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> Peer
|
||||
local_start_lsn: Lsn(sk_info.local_start_lsn),
|
||||
pg_connstr: sk_info.safekeeper_connstr.clone(),
|
||||
http_connstr: sk_info.http_connstr.clone(),
|
||||
https_connstr: sk_info.https_connstr.clone(),
|
||||
ts,
|
||||
}
|
||||
}
|
||||
@@ -363,6 +364,7 @@ impl SharedState {
|
||||
.to_owned()
|
||||
.unwrap_or(conf.listen_pg_addr.clone()),
|
||||
http_connstr: conf.listen_http_addr.to_owned(),
|
||||
https_connstr: conf.listen_https_addr.to_owned(),
|
||||
backup_lsn: self.sk.state().inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state().local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
@@ -699,7 +701,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
|
||||
pub async fn write_shared_state(self: &Arc<Self>) -> WriteGuardSharedState<'_> {
|
||||
WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
|
||||
}
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ fn test_many_tx() -> anyhow::Result<()> {
|
||||
}
|
||||
None
|
||||
})
|
||||
.last()
|
||||
.next_back()
|
||||
.unwrap();
|
||||
|
||||
let initdb_lsn = 21623024;
|
||||
|
||||
@@ -184,6 +184,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
ssl_cert_file: Utf8PathBuf::from(""),
|
||||
ssl_cert_reload_period: Duration::ZERO,
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -141,6 +141,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
|
||||
https_connstr: Some("zenith-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
|
||||
@@ -45,8 +45,10 @@ message SafekeeperTimelineInfo {
|
||||
uint64 standby_horizon = 14;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string
|
||||
// HTTP endpoint connection string.
|
||||
string http_connstr = 13;
|
||||
// HTTPS endpoint connection string.
|
||||
optional string https_connstr = 15;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
@@ -764,6 +764,7 @@ mod tests {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
|
||||
@@ -10,13 +10,11 @@ pub struct Client {
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
pub fn new(http_client: reqwest::Client, base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
Self {
|
||||
base_url,
|
||||
jwt_token,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client"),
|
||||
client: http_client,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::error::Error as _;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
@@ -364,25 +365,28 @@ pub(crate) struct ShardUpdate<'a> {
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
pub(super) fn new(config: Config) -> Self {
|
||||
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
|
||||
let authorization_header = config
|
||||
.control_plane_jwt_token
|
||||
.clone()
|
||||
.map(|jwt| format!("Bearer {}", jwt));
|
||||
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(NOTIFY_REQUEST_TIMEOUT)
|
||||
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
|
||||
for cert in &config.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
.expect("Failed to construct HTTP client");
|
||||
.context("Failed to build http client for compute hook")?;
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
state: Default::default(),
|
||||
config,
|
||||
authorization_header,
|
||||
neon_local_lock: Default::default(),
|
||||
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
|
||||
client,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// For test environments: use neon_local's LocalEnv to update compute
|
||||
|
||||
@@ -12,6 +12,7 @@ use safekeeper_api::models::SafekeeperUtilization;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use thiserror::Error;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::id::NodeId;
|
||||
use utils::logging::SecretString;
|
||||
|
||||
@@ -227,6 +228,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
|
||||
Some((*node_id, status))
|
||||
}
|
||||
.instrument(tracing::info_span!("heartbeat_ps", %node_id))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -253,7 +255,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
PageserverState::WarmingUp { .. } => {
|
||||
warming_up += 1;
|
||||
}
|
||||
PageserverState::Offline { .. } => offline += 1,
|
||||
PageserverState::Offline => offline += 1,
|
||||
PageserverState::Available { .. } => {}
|
||||
}
|
||||
}
|
||||
@@ -369,6 +371,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
|
||||
Some((*node_id, status))
|
||||
}
|
||||
.instrument(tracing::info_span!("heartbeat_sk", %node_id))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -391,7 +394,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
let mut offline = 0;
|
||||
for state in new_state.values() {
|
||||
match state {
|
||||
SafekeeperState::Offline { .. } => offline += 1,
|
||||
SafekeeperState::Offline => offline += 1,
|
||||
SafekeeperState::Available { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -639,6 +639,15 @@ async fn handle_tenant_timeline_passthrough(
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
|
||||
};
|
||||
|
||||
let method = match *req.method() {
|
||||
hyper::Method::GET => reqwest::Method::GET,
|
||||
hyper::Method::POST => reqwest::Method::POST,
|
||||
hyper::Method::PUT => reqwest::Method::PUT,
|
||||
hyper::Method::DELETE => reqwest::Method::DELETE,
|
||||
hyper::Method::PATCH => reqwest::Method::PATCH,
|
||||
_ => return Err(ApiError::BadRequest(anyhow::anyhow!("Unsupported method"))),
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Proxying request for tenant {} ({})",
|
||||
tenant_or_shard_id.tenant_id,
|
||||
@@ -686,7 +695,7 @@ async fn handle_tenant_timeline_passthrough(
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
let resp = client.op_raw(method, path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
|
||||
@@ -1407,6 +1416,12 @@ async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Bod
|
||||
)));
|
||||
}
|
||||
|
||||
if id <= 0 {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"id not allowed to be zero or negative: {id}"
|
||||
)));
|
||||
}
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
@@ -1718,9 +1733,9 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
|
||||
};
|
||||
|
||||
if *self_addr == leader_addr {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Leader is stepped down instance"
|
||||
))));
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
|
||||
"Leader is stepped down instance".into(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1729,19 +1744,17 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
|
||||
// Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
|
||||
// include some leeway to get the timeout for proxied requests.
|
||||
const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(PROXIED_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
let client = match client {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to build leader client for forwarding while in stepped down state: {err}"
|
||||
))));
|
||||
}
|
||||
};
|
||||
|
||||
let request: reqwest::Request = match convert_request(req, &client, leader.address).await {
|
||||
let client = state.service.get_http_client().clone();
|
||||
|
||||
let request: reqwest::Request = match convert_request(
|
||||
req,
|
||||
&client,
|
||||
leader.address,
|
||||
PROXIED_REQUEST_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -1799,6 +1812,7 @@ async fn convert_request(
|
||||
req: hyper::Request<Body>,
|
||||
client: &reqwest::Client,
|
||||
to_address: String,
|
||||
timeout: Duration,
|
||||
) -> Result<reqwest::Request, ApiError> {
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -1853,6 +1867,7 @@ async fn convert_request(
|
||||
.request(method, uri)
|
||||
.headers(headers)
|
||||
.body(body)
|
||||
.timeout(timeout)
|
||||
.build()
|
||||
.map_err(|err| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
|
||||
@@ -2247,6 +2262,17 @@ pub fn make_router(
|
||||
RequestName("v1_tenant_passthrough"),
|
||||
)
|
||||
})
|
||||
// Tenant timeline mark_invisible passthrough to shard zero
|
||||
.put(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_passthrough,
|
||||
RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -110,7 +110,20 @@ impl Leadership {
|
||||
) -> Option<GlobalObservedState> {
|
||||
tracing::info!("Sending step down request to {leader:?}");
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &self.config.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let http_client = match http_client.build() {
|
||||
Ok(http_client) => http_client,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to build client for leader step-down request: {err}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let client = PeerClient::new(
|
||||
http_client,
|
||||
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
|
||||
self.config.peer_jwt_token.clone(),
|
||||
);
|
||||
|
||||
@@ -115,19 +115,17 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
split_threshold: Option<u64>,
|
||||
|
||||
/// Maximum number of shards during autosplits. 0 disables autosplits.
|
||||
// TODO: defaults to 8 for backwards compatibility, should default to 255.
|
||||
#[arg(long, default_value = "8")]
|
||||
/// Maximum number of shards during autosplits. 0 disables autosplits. Defaults
|
||||
/// to 16 as a safety to avoid too many shards by accident.
|
||||
#[arg(long, default_value = "16")]
|
||||
max_split_shards: u8,
|
||||
|
||||
/// Size threshold for initial shard splits of unsharded tenants. 0 disables initial splits.
|
||||
// TODO: defaults to 64 GB for backwards compatibility. Should default to None.
|
||||
#[arg(long, default_value = "68719476736")]
|
||||
initial_split_threshold: u64,
|
||||
#[arg(long)]
|
||||
initial_split_threshold: Option<u64>,
|
||||
|
||||
/// Number of target shards for initial splits. 0 or 1 disables initial splits.
|
||||
// TODO: defaults to 8 for backwards compatibility. Should default to 2.
|
||||
#[arg(long, default_value = "8")]
|
||||
/// Number of target shards for initial splits. 0 or 1 disables initial splits. Defaults to 2.
|
||||
#[arg(long, default_value = "2")]
|
||||
initial_split_shards: u8,
|
||||
|
||||
/// Maximum number of normal-priority reconcilers that may run in parallel
|
||||
@@ -285,10 +283,8 @@ impl Secrets {
|
||||
fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
|
||||
if let Some(v) = cli {
|
||||
Some(v.clone())
|
||||
} else if let Ok(v) = std::env::var(env_name) {
|
||||
Some(v)
|
||||
} else {
|
||||
None
|
||||
std::env::var(env_name).ok()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -417,7 +413,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
tenant_rate_limit: args.tenant_rate_limit,
|
||||
split_threshold: args.split_threshold,
|
||||
max_split_shards: args.max_split_shards,
|
||||
initial_split_threshold: Some(args.initial_split_threshold),
|
||||
initial_split_threshold: args.initial_split_threshold,
|
||||
initial_split_shards: args.initial_split_shards,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
max_secondary_lag_bytes: args.max_secondary_lag_bytes,
|
||||
|
||||
@@ -59,11 +59,11 @@ impl ResponseErrorMessageExt for reqwest::Response {
|
||||
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
impl PeerClient {
|
||||
pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
|
||||
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
|
||||
Self {
|
||||
uri,
|
||||
jwt,
|
||||
client: reqwest::Client::new(),
|
||||
client: http_client,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1524,25 +1524,14 @@ impl Persistence {
|
||||
/// Load pending operations from db.
|
||||
pub(crate) async fn list_pending_ops(
|
||||
&self,
|
||||
filter_for_sk: Option<NodeId>,
|
||||
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
|
||||
use crate::schema::safekeeper_timeline_pending_ops::dsl;
|
||||
|
||||
const FILTER_VAL_1: i64 = 1;
|
||||
const FILTER_VAL_2: i64 = 2;
|
||||
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
|
||||
let timeline_from_db = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<TimelinePendingOpPersistence> =
|
||||
dsl::safekeeper_timeline_pending_ops
|
||||
.filter(
|
||||
dsl::sk_id
|
||||
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
|
||||
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
|
||||
)
|
||||
.load(conn)
|
||||
.await?;
|
||||
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -686,6 +686,8 @@ impl Reconciler {
|
||||
.await?,
|
||||
);
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-post-generation-inc");
|
||||
|
||||
let dest_conf = build_location_config(
|
||||
&self.shard,
|
||||
&self.config,
|
||||
@@ -760,7 +762,9 @@ impl Reconciler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
|
||||
/// Returns true if the observed state of the attached location was refreshed
|
||||
/// and false otherwise.
|
||||
async fn maybe_refresh_observed(&mut self) -> Result<bool, ReconcileError> {
|
||||
// If the attached node has uncertain state, read it from the pageserver before proceeding: this
|
||||
// is important to avoid spurious generation increments.
|
||||
//
|
||||
@@ -770,7 +774,7 @@ impl Reconciler {
|
||||
|
||||
let Some(attached_node) = self.intent.attached.as_ref() else {
|
||||
// Nothing to do
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if matches!(
|
||||
@@ -815,7 +819,7 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Reconciling a tenant makes API calls to pageservers until the observed state
|
||||
@@ -831,7 +835,7 @@ impl Reconciler {
|
||||
/// state where it still requires later reconciliation.
|
||||
pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
|
||||
// Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
|
||||
self.maybe_refresh_observed().await?;
|
||||
let refreshed = self.maybe_refresh_observed().await?;
|
||||
|
||||
// Special case: live migration
|
||||
self.maybe_live_migrate().await?;
|
||||
@@ -855,8 +859,14 @@ impl Reconciler {
|
||||
);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
|
||||
if refreshed {
|
||||
tracing::info!(
|
||||
node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute.");
|
||||
self.compute_notify().await?;
|
||||
} else {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.");
|
||||
}
|
||||
}
|
||||
observed => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
|
||||
@@ -101,7 +101,7 @@ impl SafekeeperClient {
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<models::TimelineDeleteResult> {
|
||||
) -> Result<models::TenantDeleteResult> {
|
||||
measured_request!(
|
||||
"delete_tenant",
|
||||
crate::metrics::Method::Delete,
|
||||
|
||||
@@ -1711,7 +1711,7 @@ impl Service {
|
||||
))),
|
||||
config: config.clone(),
|
||||
persistence,
|
||||
compute_hook: Arc::new(ComputeHook::new(config.clone())),
|
||||
compute_hook: Arc::new(ComputeHook::new(config.clone())?),
|
||||
result_tx,
|
||||
heartbeater_ps,
|
||||
heartbeater_sk,
|
||||
|
||||
@@ -35,6 +35,10 @@ impl SafekeeperReconcilers {
|
||||
service: &Arc<Service>,
|
||||
reqs: Vec<ScheduleRequest>,
|
||||
) {
|
||||
tracing::info!(
|
||||
"Scheduling {} pending safekeeper ops loaded from db",
|
||||
reqs.len()
|
||||
);
|
||||
for req in reqs {
|
||||
self.schedule_request(service, req);
|
||||
}
|
||||
@@ -74,7 +78,7 @@ pub(crate) async fn load_schedule_requests(
|
||||
service: &Arc<Service>,
|
||||
safekeepers: &HashMap<NodeId, Safekeeper>,
|
||||
) -> anyhow::Result<Vec<ScheduleRequest>> {
|
||||
let pending_ops = service.persistence.list_pending_ops(None).await?;
|
||||
let pending_ops = service.persistence.list_pending_ops().await?;
|
||||
let mut res = Vec::with_capacity(pending_ops.len());
|
||||
for op_persist in pending_ops {
|
||||
let node_id = NodeId(op_persist.sk_id as u64);
|
||||
@@ -232,12 +236,14 @@ impl SafekeeperReconciler {
|
||||
let kind = req.kind;
|
||||
let tenant_id = req.tenant_id;
|
||||
let timeline_id = req.timeline_id;
|
||||
let node_id = req.safekeeper.skp.id;
|
||||
self.reconcile_one(req, req_cancel)
|
||||
.instrument(tracing::info_span!(
|
||||
"reconcile_one",
|
||||
?kind,
|
||||
%tenant_id,
|
||||
?timeline_id
|
||||
?timeline_id,
|
||||
%node_id,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -622,7 +622,7 @@ impl TenantShard {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
attached_locs.sort_by_key(|i| i.1);
|
||||
if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
|
||||
if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
|
||||
self.intent.set_attached(scheduler, Some(*node_id));
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ enum LargeObjectKind {
|
||||
|
||||
impl LargeObjectKind {
|
||||
fn from_key(key: &str) -> Self {
|
||||
let fname = key.split('/').last().unwrap();
|
||||
let fname = key.split('/').next_back().unwrap();
|
||||
|
||||
let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
|
||||
return LargeObjectKind::Other;
|
||||
|
||||
@@ -295,8 +295,8 @@ pub struct ControllerClientConfig {
|
||||
}
|
||||
|
||||
impl ControllerClientConfig {
|
||||
pub fn build_client(self) -> control_api::Client {
|
||||
control_api::Client::new(self.controller_api, Some(self.controller_jwt))
|
||||
pub fn build_client(self, http_client: reqwest::Client) -> control_api::Client {
|
||||
control_api::Client::new(http_client, self.controller_api, Some(self.controller_jwt))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ use camino::Utf8PathBuf;
|
||||
use clap::{Parser, Subcommand};
|
||||
use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use reqwest::{Method, Url};
|
||||
use reqwest::{Certificate, Method, Url};
|
||||
use storage_controller_client::control_api;
|
||||
use storage_scrubber::garbage::{PurgeMode, find_garbage, purge_garbage};
|
||||
use storage_scrubber::pageserver_physical_gc::{GcMode, pageserver_physical_gc};
|
||||
@@ -41,6 +41,10 @@ struct Cli {
|
||||
/// If set to true, the scrubber will exit with error code on fatal error.
|
||||
#[arg(long, default_value_t = false)]
|
||||
exit_code: bool,
|
||||
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
@@ -146,13 +150,28 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG);
|
||||
|
||||
let ssl_ca_certs = match cli.ssl_ca_file.as_ref() {
|
||||
Some(ssl_ca_file) => {
|
||||
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let controller_client = cli.controller_api.map(|controller_api| {
|
||||
ControllerClientConfig {
|
||||
controller_api,
|
||||
// Default to no key: this is a convenience when working in a development environment
|
||||
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
|
||||
}
|
||||
.build_client()
|
||||
.build_client(http_client)
|
||||
});
|
||||
|
||||
match cli.command {
|
||||
|
||||
@@ -376,6 +376,28 @@ class PageserverWalReceiverProtocol(StrEnum):
|
||||
raise ValueError(f"Unknown protocol type: {proto}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverTracingConfig:
|
||||
sampling_ratio: tuple[int, int]
|
||||
endpoint: str
|
||||
protocol: str
|
||||
timeout: str
|
||||
|
||||
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
|
||||
value = {
|
||||
"sampling_ratio": {
|
||||
"numerator": self.sampling_ratio[0],
|
||||
"denominator": self.sampling_ratio[1],
|
||||
},
|
||||
"export_config": {
|
||||
"endpoint": self.endpoint,
|
||||
"protocol": self.protocol,
|
||||
"timeout": self.timeout,
|
||||
},
|
||||
}
|
||||
return ("tracing", value)
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -425,6 +447,7 @@ class NeonEnvBuilder:
|
||||
pageserver_virtual_file_io_mode: str | None = None,
|
||||
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
|
||||
pageserver_get_vectored_concurrent_io: str | None = None,
|
||||
pageserver_tracing_config: PageserverTracingConfig | None = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -478,6 +501,8 @@ class NeonEnvBuilder:
|
||||
pageserver_get_vectored_concurrent_io
|
||||
)
|
||||
|
||||
self.pageserver_tracing_config = pageserver_tracing_config
|
||||
|
||||
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
|
||||
pageserver_default_tenant_config_compaction_algorithm
|
||||
)
|
||||
@@ -1138,6 +1163,7 @@ class NeonEnv:
|
||||
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
|
||||
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
|
||||
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
|
||||
self.pageserver_tracing_config = config.pageserver_tracing_config
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1262,6 +1288,14 @@ class NeonEnv:
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_tracing_config is not None:
|
||||
key, value = self.pageserver_tracing_config.to_config_key_value()
|
||||
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
ps_cfg[key] = value
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
|
||||
@@ -1284,6 +1318,7 @@ class NeonEnv:
|
||||
"http_port": port.http,
|
||||
"https_port": port.https,
|
||||
"sync": config.safekeepers_enable_fsync,
|
||||
"use_https_safekeeper_api": config.use_https_safekeeper_api,
|
||||
}
|
||||
if config.auth_enabled:
|
||||
sk_cfg["auth_enabled"] = True
|
||||
@@ -1344,6 +1379,8 @@ class NeonEnv:
|
||||
and self.storage_controller_config.get("timelines_onto_safekeepers") is True
|
||||
):
|
||||
for sk_id, sk in enumerate(self.safekeepers):
|
||||
# 0 is an invalid safekeeper id
|
||||
sk_id = sk_id + 1
|
||||
body = {
|
||||
"id": sk_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
|
||||
@@ -110,6 +110,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*delaying layer flush by \\S+ for compaction backpressure.*",
|
||||
".*stalling layer flushes for compaction backpressure.*",
|
||||
".*layer roll waiting for flush due to compaction backpressure.*",
|
||||
".*BatchSpanProcessor.*",
|
||||
)
|
||||
|
||||
|
||||
@@ -118,6 +119,7 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
|
||||
# failing to connect to them.
|
||||
".*Call to node.*management API.*failed.*receive body.*",
|
||||
".*Call to node.*management API.*failed.*ReceiveBody.*",
|
||||
".*Call to node.*management API.*failed.*Timeout.*",
|
||||
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
|
||||
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
|
||||
# Many tests will start up with a node offline
|
||||
|
||||
@@ -853,6 +853,25 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res_json = res.json()
|
||||
return res_json
|
||||
|
||||
def timeline_mark_invisible(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
is_visible: bool | None = None,
|
||||
):
|
||||
data = {
|
||||
"is_visible": is_visible,
|
||||
}
|
||||
|
||||
log.info(
|
||||
f"Requesting marking timeline invisible for {is_visible=}, {tenant_id=}, {timeline_id=}"
|
||||
)
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/mark_invisible",
|
||||
json=data,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_get_timestamp_of_lsn(
|
||||
self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, lsn: Lsn
|
||||
):
|
||||
@@ -1173,3 +1192,28 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
log.info(f"Got perf info response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def ingest_aux_files(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
aux_files: dict[str, bytes],
|
||||
):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/ingest_aux_files",
|
||||
json={
|
||||
"aux_files": aux_files,
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def list_aux_files(
|
||||
self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, lsn: Lsn
|
||||
) -> Any:
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/list_aux_files",
|
||||
json={"lsn": str(lsn)},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user