Compare commits

..

2 Commits

Author SHA1 Message Date
Folke Behrens
357795e0c5 VPC flow logs IaC 2025-04-06 13:14:40 +02:00
Folke Behrens
f105ddb778 aws lambda function for periodic collection of pod infomation 2025-04-06 13:14:40 +02:00
60 changed files with 2576 additions and 1002 deletions

View File

@@ -2,9 +2,6 @@ import json
import os
import subprocess
RED = "\033[91m"
RESET = "\033[0m"
image_map = os.getenv("IMAGE_MAP")
if not image_map:
raise ValueError("IMAGE_MAP environment variable is not set")
@@ -32,14 +29,9 @@ while len(pending) > 0:
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
failures.append((" ".join(cmd), result.stdout, target))
failures.append((" ".join(cmd), result.stdout))
pending.append((source, target))
print(
f"{RED}[RETRY]{RESET} Push failed for {target}. Retrying... (failure count: {len(failures)})"
)
print(result.stdout)
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
failed_targets = [target for _, _, target in failures]
with open(github_output, "a") as f:
f.write(f"push_failures={json.dumps(failed_targets)}\n")
f.write("slack_notify=true\n")

View File

@@ -110,19 +110,12 @@ jobs:
IMAGE_MAP: ${{ inputs.image-map }}
- name: Notify Slack if container image pushing fails
if: steps.push.outputs.push_failures || failure()
if: steps.push.outputs.slack_notify == 'true' || failure()
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
text: >
*Container image pushing ${{
steps.push.outcome == 'failure' && 'failed completely' || 'succeeded with some retries'
}}* in
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
${{ steps.push.outputs.push_failures && format(
'*Failed targets:*\n• {0}', join(fromJson(steps.push.outputs.push_failures), '\n• ')
) || '' }}
text: |
Pushing container images failed in <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>

1090
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -40,6 +40,8 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"lambda/aztraffic",
"lambda/pod_info_dumper",
]
[workspace.package]
@@ -183,7 +185,7 @@ test-context = "0.3"
thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio = { version = "1.41", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
@@ -342,3 +344,12 @@ inherits = "release"
debug = false # true = 2 = all symbols, 1 = line only
opt-level = "z"
lto = true
[profile.release-lambda-function]
inherits = "release"
lto = true
opt-level = "z"
codegen-units = 1
panic = "abort"
debug = false
strip = true

View File

@@ -1022,6 +1022,39 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
#########################################################################################
#
# Layer "pg_embedding-build"
# compile pg_embedding extension
#
#########################################################################################
FROM build-deps AS pg_embedding-src
ARG PG_VERSION
# This is our extension, support stopped in favor of pgvector
# TODO: deprecate it
WORKDIR /ext-src
RUN case "${PG_VERSION:?}" in \
"v14" | "v15") \
export PG_EMBEDDING_VERSION=0.3.5 \
export PG_EMBEDDING_CHECKSUM=0e95b27b8b6196e2cf0a0c9ec143fe2219b82e54c5bb4ee064e76398cbe69ae9 \
;; \
*) \
echo "pg_embedding not supported on this PostgreSQL version. Use pgvector instead." && exit 0;; \
esac && \
wget https://github.com/neondatabase/pg_embedding/archive/refs/tags/${PG_EMBEDDING_VERSION}.tar.gz -O pg_embedding.tar.gz && \
echo "${PG_EMBEDDING_CHECKSUM} pg_embedding.tar.gz" | sha256sum --check && \
mkdir pg_embedding-src && cd pg_embedding-src && tar xzf ../pg_embedding.tar.gz --strip-components=1 -C .
FROM pg-build AS pg_embedding-build
COPY --from=pg_embedding-src /ext-src/ /ext-src/
WORKDIR /ext-src/
RUN if [ -d pg_embedding-src ]; then \
cd pg_embedding-src && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install; \
fi
#########################################################################################
#
# Layer "pg build with nonroot user and cargo installed"
@@ -1527,51 +1560,6 @@ COPY --from=pgauditlogtofile-src /ext-src/ /ext-src/
WORKDIR /ext-src/pgauditlogtofile-src
RUN make install USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN)
#########################################################################################
#
# Layer "pg_rest-build"
# compile pg_rest extension
#
#########################################################################################
FROM build-deps AS pg_rest-src
ARG PG_VERSION
ARG PG_REST_VERSION=3.0.1
# Only supported for PostgreSQL v17
RUN if [ "${PG_VERSION:?}" != "v17" ]; then \
echo "pg_rest extension is only supported for PostgreSQL v17" && exit 0; \
fi
WORKDIR /ext-src
RUN mkdir -p pg_rest-src && cd pg_rest-src && \
wget https://github.com/ruslantalpa/foxfirebase/raw/main/pg_rest_pg17-${PG_REST_VERSION}_neon-debian-bookworm_aarch64.deb -O pg_rest_pg17-${PG_REST_VERSION}_aarch64.deb && \
wget https://github.com/ruslantalpa/foxfirebase/raw/main/pg_rest_pg17-${PG_REST_VERSION}_neon-debian-bookworm_amd64.deb -O pg_rest_pg17-${PG_REST_VERSION}_amd64.deb
FROM pg-build AS pg_rest-build
ARG PG_REST_VERSION=3.0.1
ARG PG_VERSION
COPY --from=pg_rest-src /ext-src/ /ext-src/
WORKDIR /ext-src/pg_rest-src
RUN if [ "${PG_VERSION:?}" = "v17" ]; then \
export ARCH=$(uname -m) && \
echo "ARCH: $ARCH" && \
# Map architecture names to package names
if [ "$ARCH" = "x86_64" ]; then \
PACKAGE_ARCH="amd64"; \
else \
PACKAGE_ARCH="$ARCH"; \
fi && \
echo "Using package architecture: $PACKAGE_ARCH" && \
apt-get update && \
apt-get install -y ./pg_rest_pg17-${PG_REST_VERSION}_${PACKAGE_ARCH}.deb && \
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_rest.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_rest.control && \
# Clean up
apt-get clean && rm -rf /var/lib/apt/lists/*; \
else \
echo "pg_rest extension is only supported for PostgreSQL v17, skipping build"; \
fi
#########################################################################################
#
# Layer "neon-ext-build"
@@ -1659,6 +1647,7 @@ COPY --from=rdkit-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_uuidv7-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_embedding-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -1667,7 +1656,6 @@ COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_rest-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
#
@@ -1836,6 +1824,7 @@ COPY --from=pg_cron-src /ext-src/ /ext-src/
COPY --from=pg_uuidv7-src /ext-src/ /ext-src/
COPY --from=pg_roaringbitmap-src /ext-src/ /ext-src/
COPY --from=pg_semver-src /ext-src/ /ext-src/
#COPY --from=pg_embedding-src /ext-src/ /ext-src/
#COPY --from=wal2json-src /ext-src/ /ext-src/
COPY --from=pg_ivm-src /ext-src/ /ext-src/
COPY --from=pg_partman-src /ext-src/ /ext-src/
@@ -1844,6 +1833,7 @@ COPY --from=pg_repack-src /ext-src/ /ext-src/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY compute/patches/pg_repack.patch /ext-src
RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /ext-src/pg_repack.patch
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl\
&& apt clean && rm -rf /ext-src/*.tar.gz /var/lib/apt/lists/*
@@ -1976,4 +1966,3 @@ RUN mkdir /var/run/rsyslogd && \
ENV LANG=en_US.utf8
USER postgres
ENTRYPOINT ["/usr/local/bin/compute_ctl"]

View File

@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
ERROR: must be owner of relation constraint_comments_tbl
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
index d785f92561..16377e5ac9 100644
index 442e7aff2b..525f732b03 100644
--- a/src/test/regress/expected/conversion.out
+++ b/src/test/regress/expected/conversion.out
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
@@ -8,7 +8,7 @@
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -587,15 +587,16 @@ index f551624afb..57f1e432d4 100644
SELECT *
INTO TABLE ramp
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
index 4cbdbdf84d..573362850e 100644
index 454db91ec0..01378d7081 100644
--- a/src/test/regress/expected/database.out
+++ b/src/test/regress/expected/database.out
@@ -1,8 +1,6 @@
@@ -1,8 +1,7 @@
CREATE DATABASE regression_tbd
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
+WARNING: you need to manually restart any running background workers after this command
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
BEGIN;
@@ -699,7 +700,7 @@ index 6ed50fdcfa..caa00a345d 100644
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 84745b9f60..4883c12351 100644
index 6b8c2f2414..8e13b7fa46 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -1111,7 +1112,7 @@ index 8475231735..0653946337 100644
DROP ROLE regress_passwd_sha_len1;
DROP ROLE regress_passwd_sha_len2;
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
index 620fbe8c52..0570102357 100644
index 5b9dba7b32..cc408dad42 100644
--- a/src/test/regress/expected/privileges.out
+++ b/src/test/regress/expected/privileges.out
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
@@ -1173,8 +1174,8 @@ index 620fbe8c52..0570102357 100644
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
SET SESSION AUTHORIZATION regress_priv_user3;
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
SET SESSION AUTHORIZATION regress_priv_user1;
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
ERROR: permission denied to grant privileges as role "regress_priv_role"
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
@@ -1191,7 +1192,7 @@ index 620fbe8c52..0570102357 100644
DROP ROLE regress_priv_role;
SET SESSION AUTHORIZATION regress_priv_user1;
SELECT session_user, current_user;
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -1200,7 +1201,7 @@ index 620fbe8c52..0570102357 100644
-- Check that index expressions and predicates are run as the table's owner
-- A dummy index function checking current_user
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
drop cascades to function testns.priv_testproc(integer)
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -1211,7 +1212,7 @@ index 620fbe8c52..0570102357 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
DROP USER regress_priv_user8; -- does not exist
ERROR: role "regress_priv_user8" does not exist
-- permissions with LOCK TABLE
@@ -1220,7 +1221,7 @@ index 620fbe8c52..0570102357 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
GRANT SELECT ON lock_table TO regress_locktable_user;
@@ -2881,7 +2885,7 @@ DROP USER regress_locktable_user;
@@ -2874,7 +2878,7 @@ DROP USER regress_locktable_user;
-- pg_backend_memory_contexts.
-- switch to superuser
\c -
@@ -1229,7 +1230,7 @@ index 620fbe8c52..0570102357 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
has_table_privilege
---------------------
@@ -2925,10 +2929,10 @@ RESET ROLE;
@@ -2918,10 +2922,10 @@ RESET ROLE;
-- clean up
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -1244,7 +1245,7 @@ index 620fbe8c52..0570102357 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
SET SESSION AUTHORIZATION regress_group_direct_manager;
@@ -2957,9 +2961,9 @@ DROP ROLE regress_group_direct_manager;
@@ -2950,9 +2954,9 @@ DROP ROLE regress_group_direct_manager;
DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -1840,7 +1841,7 @@ index 09a255649b..15895f0c53 100644
CREATE TABLE ruletest_t2 (x int);
CREATE VIEW ruletest_v1 WITH (security_invoker=true) AS
diff --git a/src/test/regress/expected/security_label.out b/src/test/regress/expected/security_label.out
index a8e01a6220..83543b250a 100644
index a8e01a6220..5a9cef4ede 100644
--- a/src/test/regress/expected/security_label.out
+++ b/src/test/regress/expected/security_label.out
@@ -6,8 +6,8 @@ SET client_min_messages TO 'warning';
@@ -1854,6 +1855,34 @@ index a8e01a6220..83543b250a 100644
CREATE TABLE seclabel_tbl1 (a int, b text);
CREATE TABLE seclabel_tbl2 (x int, y text);
CREATE VIEW seclabel_view1 AS SELECT * FROM seclabel_tbl2;
@@ -19,21 +19,21 @@ ALTER TABLE seclabel_tbl2 OWNER TO regress_seclabel_user2;
-- Test of SECURITY LABEL statement without a plugin
--
SECURITY LABEL ON TABLE seclabel_tbl1 IS 'classified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL FOR 'dummy' ON TABLE seclabel_tbl1 IS 'classified'; -- fail
ERROR: security label provider "dummy" is not loaded
SECURITY LABEL ON TABLE seclabel_tbl1 IS '...invalid label...'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON TABLE seclabel_tbl3 IS 'unclassified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL FOR 'dummy' ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
ERROR: security label provider "dummy" is not loaded
SECURITY LABEL ON ROLE regress_seclabel_user1 IS '...invalid label...'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON ROLE regress_seclabel_user3 IS 'unclassified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
-- clean up objects
DROP FUNCTION seclabel_four();
DROP DOMAIN seclabel_domain;
diff --git a/src/test/regress/expected/select_into.out b/src/test/regress/expected/select_into.out
index b79fe9a1c0..e29fab88ab 100644
--- a/src/test/regress/expected/select_into.out
@@ -2384,10 +2413,10 @@ index e3e3bea709..fa86ddc326 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
index b567a1a572..4d1ac2e631 100644
index 9a65fca91f..58431a3056 100644
--- a/src/test/regress/sql/conversion.sql
+++ b/src/test/regress/sql/conversion.sql
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -2751,7 +2780,7 @@ index ae6841308b..47bc792e30 100644
SELECT *
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
index 46ad263478..eb05584ed5 100644
index 0367c0e37a..a23b98c4bd 100644
--- a/src/test/regress/sql/database.sql
+++ b/src/test/regress/sql/database.sql
@@ -1,8 +1,6 @@
@@ -2864,7 +2893,7 @@ index aa147b14a9..370e0dd570 100644
CREATE FOREIGN DATA WRAPPER dummy;
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 9f4210b26e..620d3fc87e 100644
index 45c7a534cb..32dd26b8cd 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -3217,7 +3246,7 @@ index 53e86b0b6c..0303fdfe96 100644
-- Check that the invalid secrets were re-hashed. A re-hashed secret
-- should not contain the original salt.
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
index 259f1aedd1..6e1a3d17b7 100644
index 249df17a58..b258e7f26a 100644
--- a/src/test/regress/sql/privileges.sql
+++ b/src/test/regress/sql/privileges.sql
@@ -24,18 +24,18 @@ RESET client_min_messages;
@@ -3279,7 +3308,7 @@ index 259f1aedd1..6e1a3d17b7 100644
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -3288,7 +3317,7 @@ index 259f1aedd1..6e1a3d17b7 100644
-- Check that index expressions and predicates are run as the table's owner
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -3299,7 +3328,7 @@ index 259f1aedd1..6e1a3d17b7 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
-- permissions with LOCK TABLE
@@ -3308,7 +3337,7 @@ index 259f1aedd1..6e1a3d17b7 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
@@ -1839,7 +1839,7 @@ DROP USER regress_locktable_user;
@@ -1836,7 +1836,7 @@ DROP USER regress_locktable_user;
-- switch to superuser
\c -
@@ -3317,7 +3346,7 @@ index 259f1aedd1..6e1a3d17b7 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
@@ -1859,10 +1859,10 @@ RESET ROLE;
@@ -1856,10 +1856,10 @@ RESET ROLE;
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -3332,7 +3361,7 @@ index 259f1aedd1..6e1a3d17b7 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
@@ -1884,9 +1884,9 @@ DROP ROLE regress_group_indirect_manager;
@@ -1881,9 +1881,9 @@ DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes

View File

@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
ERROR: must be owner of relation constraint_comments_tbl
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
index d785f92561..16377e5ac9 100644
index 442e7aff2b..525f732b03 100644
--- a/src/test/regress/expected/conversion.out
+++ b/src/test/regress/expected/conversion.out
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
@@ -8,7 +8,7 @@
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -587,15 +587,16 @@ index f551624afb..57f1e432d4 100644
SELECT *
INTO TABLE ramp
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
index 4cbdbdf84d..573362850e 100644
index 454db91ec0..01378d7081 100644
--- a/src/test/regress/expected/database.out
+++ b/src/test/regress/expected/database.out
@@ -1,8 +1,6 @@
@@ -1,8 +1,7 @@
CREATE DATABASE regression_tbd
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
+WARNING: you need to manually restart any running background workers after this command
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
BEGIN;
@@ -699,7 +700,7 @@ index 6ed50fdcfa..caa00a345d 100644
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index fe6a1015f2..614b387b7d 100644
index 69994c98e3..129abcfbe8 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -1146,7 +1147,7 @@ index 924d6e001d..7fdda73439 100644
DROP ROLE regress_passwd_sha_len1;
DROP ROLE regress_passwd_sha_len2;
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
index e8c668e0a1..03be5c2120 100644
index 1296da0d57..f43fffa44c 100644
--- a/src/test/regress/expected/privileges.out
+++ b/src/test/regress/expected/privileges.out
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
@@ -1208,8 +1209,8 @@ index e8c668e0a1..03be5c2120 100644
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
SET SESSION AUTHORIZATION regress_priv_user3;
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
SET SESSION AUTHORIZATION regress_priv_user1;
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
ERROR: permission denied to grant privileges as role "regress_priv_role"
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
@@ -1226,7 +1227,7 @@ index e8c668e0a1..03be5c2120 100644
DROP ROLE regress_priv_role;
SET SESSION AUTHORIZATION regress_priv_user1;
SELECT session_user, current_user;
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -1235,7 +1236,7 @@ index e8c668e0a1..03be5c2120 100644
-- Check that index expressions and predicates are run as the table's owner
-- A dummy index function checking current_user
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
drop cascades to function testns.priv_testproc(integer)
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -1246,7 +1247,7 @@ index e8c668e0a1..03be5c2120 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
DROP USER regress_priv_user8; -- does not exist
ERROR: role "regress_priv_user8" does not exist
-- permissions with LOCK TABLE
@@ -1255,7 +1256,7 @@ index e8c668e0a1..03be5c2120 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
GRANT SELECT ON lock_table TO regress_locktable_user;
@@ -2895,7 +2899,7 @@ DROP USER regress_locktable_user;
@@ -2888,7 +2892,7 @@ DROP USER regress_locktable_user;
-- pg_backend_memory_contexts.
-- switch to superuser
\c -
@@ -1264,7 +1265,7 @@ index e8c668e0a1..03be5c2120 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
has_table_privilege
---------------------
@@ -2939,10 +2943,10 @@ RESET ROLE;
@@ -2932,10 +2936,10 @@ RESET ROLE;
-- clean up
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -1279,7 +1280,7 @@ index e8c668e0a1..03be5c2120 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
SET SESSION AUTHORIZATION regress_group_direct_manager;
@@ -2971,9 +2975,9 @@ DROP ROLE regress_group_direct_manager;
@@ -2964,9 +2968,9 @@ DROP ROLE regress_group_direct_manager;
DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -1292,7 +1293,7 @@ index e8c668e0a1..03be5c2120 100644
CREATE SCHEMA regress_roleoption;
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
@@ -3002,9 +3006,9 @@ DROP ROLE regress_roleoption_protagonist;
@@ -2995,9 +2999,9 @@ DROP ROLE regress_roleoption_protagonist;
DROP ROLE regress_roleoption_donor;
DROP ROLE regress_roleoption_recipient;
-- MAINTAIN
@@ -2432,10 +2433,10 @@ index e3e3bea709..fa86ddc326 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
index b567a1a572..4d1ac2e631 100644
index 9a65fca91f..58431a3056 100644
--- a/src/test/regress/sql/conversion.sql
+++ b/src/test/regress/sql/conversion.sql
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -2799,7 +2800,7 @@ index ae6841308b..47bc792e30 100644
SELECT *
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
index 46ad263478..eb05584ed5 100644
index 0367c0e37a..a23b98c4bd 100644
--- a/src/test/regress/sql/database.sql
+++ b/src/test/regress/sql/database.sql
@@ -1,8 +1,6 @@
@@ -2912,7 +2913,7 @@ index aa147b14a9..370e0dd570 100644
CREATE FOREIGN DATA WRAPPER dummy;
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 8c4e4c7c83..e946cd2119 100644
index 2e710e419c..89cd481a54 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -3300,7 +3301,7 @@ index bb82aa4aa2..dd8a05e24d 100644
-- Check that the invalid secrets were re-hashed. A re-hashed secret
-- should not contain the original salt.
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
index b7e1cb6cdd..6e5a2217f1 100644
index 5880bc018d..27aa952b18 100644
--- a/src/test/regress/sql/privileges.sql
+++ b/src/test/regress/sql/privileges.sql
@@ -24,18 +24,18 @@ RESET client_min_messages;
@@ -3362,7 +3363,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -3371,7 +3372,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
-- Check that index expressions and predicates are run as the table's owner
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -3382,7 +3383,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
-- permissions with LOCK TABLE
@@ -3391,7 +3392,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
@@ -1854,7 +1854,7 @@ DROP USER regress_locktable_user;
@@ -1851,7 +1851,7 @@ DROP USER regress_locktable_user;
-- switch to superuser
\c -
@@ -3400,7 +3401,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
@@ -1874,10 +1874,10 @@ RESET ROLE;
@@ -1871,10 +1871,10 @@ RESET ROLE;
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -3415,7 +3416,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
@@ -1899,9 +1899,9 @@ DROP ROLE regress_group_indirect_manager;
@@ -1896,9 +1896,9 @@ DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -3428,7 +3429,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
CREATE SCHEMA regress_roleoption;
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
@@ -1929,9 +1929,9 @@ DROP ROLE regress_roleoption_donor;
@@ -1926,9 +1926,9 @@ DROP ROLE regress_roleoption_donor;
DROP ROLE regress_roleoption_recipient;
-- MAINTAIN

View File

@@ -0,0 +1,22 @@
[package]
name = "aztraffic"
version = "0.0.0"
edition.workspace = true
license.workspace = true
publish = false
[dependencies]
anyhow = "1.0.97"
aws-config = "1.6.1"
aws-sdk-athena = "1.68.0"
aws-sdk-ec2 = "1.121.0"
aws-sdk-eks = "1.82.0"
aws-sdk-glue = "1.88.0"
aws-sdk-lambda = "1.75.0"
aws-sdk-scheduler = "1.64.0"
aws-sdk-sfn = "1.68.0"
aws-sdk-sts = "1.65.0"
clap = { version = "4.5.35", features = ["derive", "env"] }
tokio = { version = "1.44.1", features = ["full"] }
serde = "1.0.219"
serde_json = { version = "1.0.140", features = ["preserve_order"] }

View File

@@ -0,0 +1,794 @@
use std::fs;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_sdk_ec2::types::{
DestinationFileFormat, DestinationOptionsRequest, FlowLogsResourceType, LogDestinationType,
TrafficType,
};
use aws_sdk_glue::primitives::Blob;
use aws_sdk_glue::types::{Column, DatabaseInput, SerDeInfo, StorageDescriptor, TableInput};
use aws_sdk_lambda::types::{Environment, FunctionCode, Runtime};
use aws_sdk_scheduler::types::{
DeadLetterConfig, FlexibleTimeWindow, FlexibleTimeWindowMode, RetryPolicy, Target,
};
use aws_sdk_sfn::types::{CloudWatchLogsLogGroup, LogDestination, LogLevel, LoggingConfiguration};
use clap::Parser;
use serde_json::json;
#[derive(Parser, Clone, Debug)]
struct Args {
#[arg(long, value_name = "id")]
account_id: String,
#[arg(long, value_name = "region")]
region: String,
#[arg(long, value_name = "cluster")]
cluster: String,
#[arg(long, value_name = "id")]
vpc_id: Vec<String>,
#[arg(long, value_name = "arn")]
log_group_arn: String,
#[arg(long, value_name = "name")]
pod_info_s3_bucket_name: String,
#[arg(
long,
value_name = "path",
default_value = "CrossAZTraffic/pod_info_dumper/pod_info.csv"
)]
pod_info_s3_bucket_key: String,
#[arg(long, value_name = "uri")]
pod_info_s3_bucket_uri: String,
#[arg(long, value_name = "uri")]
vpc_flow_logs_s3_bucket_uri: String,
#[arg(long, value_name = "uri")]
results_s3_bucket_uri: String,
#[arg(
long,
value_name = "name",
default_value = "./target/lambda/pod_info_dumper/bootstrap.zip"
)]
lambda_zipfile_path: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-podinfo-function"
)]
lambda_function_name: String,
#[arg(long, value_name = "arn")]
lambda_role_arn: String,
#[arg(long, value_name = "name")]
glue_database_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-podinfo-table"
)]
glue_pod_info_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-vpcflowlogs-table"
)]
glue_vpc_flow_logs_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-results-table"
)]
glue_results_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-trigger-schedule"
)]
schedule_name: String,
#[arg(long, value_name = "minutes", default_value_t = 60)]
schedule_interval_minutes: usize,
#[arg(long, value_name = "arn")]
schedule_target_state_machine_arn: String,
#[arg(long, value_name = "arn")]
schedule_target_role_arn: String,
#[arg(long, value_name = "arn")]
schedule_dead_letter_queue_arn: Option<String>,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-combine-query"
)]
athena_query_name: String,
#[arg(long, value_name = "uri")]
vpcflowlogs_destination_s3_bucket_uri: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-statemachine"
)]
statemachine_name: String,
#[arg(long, value_name = "arn")]
statemachine_role_arn: String,
#[arg(long, value_name = "uri")]
athena_results_s3_bucket_uri: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
eprintln!("{args:#?}");
// TODO: athena results bucket + lifecycle config
// TODO: iam role split
// TODO: iam policy
// TODO: clusterrole + binding
// TODO: eks mapping
// TODO: log group
// TODO: dlq
let sdk_config = create_sdk_config(&args).await?;
LambdaFunction {
local_zipfile_path: args.lambda_zipfile_path,
function_name: args.lambda_function_name.clone(),
role_arn: args.lambda_role_arn,
account_id: args.account_id,
region: args.region,
cluster: args.cluster,
s3_bucket_name: args.pod_info_s3_bucket_name,
s3_bucket_key: args.pod_info_s3_bucket_key,
}
.create(&sdk_config)
.await?;
GlueDatabase {
database_name: args.glue_database_name.clone(),
pod_info_table_name: args.glue_pod_info_table_name.clone(),
pod_info_s3_bucket_uri: args.pod_info_s3_bucket_uri,
vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name.clone(),
vpc_flow_logs_s3_bucket_uri: args.vpc_flow_logs_s3_bucket_uri,
results_table_name: args.glue_results_table_name.clone(),
results_s3_bucket_uri: args.results_s3_bucket_uri,
}
.create(&sdk_config)
.await?;
let named_query_id = AthenaQuery {
query_name: args.athena_query_name,
glue_database: args.glue_database_name.clone(),
invocation_frequency: args.schedule_interval_minutes,
athena_results_table_name: args.glue_results_table_name,
vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name,
pod_info_table_name: args.glue_pod_info_table_name,
}
.create(&sdk_config)
.await?;
StateMachine {
name: args.statemachine_name,
role_arn: args.statemachine_role_arn,
named_query_id,
glue_database: args.glue_database_name,
lambda_function_name: args.lambda_function_name,
athena_results_s3_bucket_uri: args.athena_results_s3_bucket_uri,
log_group_arn: args.log_group_arn,
}
.create(&sdk_config)
.await?;
Schedule {
name: args.schedule_name,
interval_minutes: args.schedule_interval_minutes,
dead_letter_queue_arn: args.schedule_dead_letter_queue_arn,
target_role_arn: args.schedule_target_role_arn,
target_state_machine_arn: args.schedule_target_state_machine_arn,
}
.create(&sdk_config)
.await?;
let flow_log_ids = VpcFlowLogs {
vpc_ids: args.vpc_id,
destination_s3_bucket_uri: args.vpcflowlogs_destination_s3_bucket_uri,
}
.create(&sdk_config)
.await?;
println!("VPC flow log IDs: {:?}", flow_log_ids.as_slice());
Ok(())
}
async fn create_sdk_config(args: &Args) -> anyhow::Result<aws_config::SdkConfig> {
let region = aws_config::Region::new(args.region.to_owned());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
struct LambdaFunction {
local_zipfile_path: String,
function_name: String,
role_arn: String,
account_id: String,
region: String,
cluster: String,
s3_bucket_name: String,
s3_bucket_key: String,
}
impl LambdaFunction {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let code = fs::read(&self.local_zipfile_path)?;
let client = aws_sdk_lambda::Client::new(sdk_config);
client
.delete_function()
.function_name(&self.function_name)
.send()
.await
.ok();
client
.create_function()
.function_name(&self.function_name)
.runtime(Runtime::Providedal2023)
.handler("bootstrap")
.role(&self.role_arn)
.code(FunctionCode::builder().zip_file(Blob::new(code)).build())
.timeout(60)
.environment(
Environment::builder()
.set_variables(Some(
[
("NEON_ACCOUNT_ID", self.account_id.as_str()),
("NEON_REGION", self.region.as_str()),
("NEON_CLUSTER", self.cluster.as_str()),
("NEON_S3_BUCKET_NAME", self.s3_bucket_name.as_str()),
("NEON_S3_BUCKET_KEY", self.s3_bucket_key.as_str()),
("AWS_LAMBDA_LOG_FORMAT", "JSON"),
("AWS_LAMBDA_LOG_LEVEL", "INFO"),
]
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
))
.build(),
)
.send()
.await?;
Ok(())
}
}
struct VpcFlowLogs {
vpc_ids: Vec<String>,
destination_s3_bucket_uri: String,
}
impl VpcFlowLogs {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<Vec<String>> {
let ec2_client = aws_sdk_ec2::Client::new(sdk_config);
let flow_logs = ec2_client
.create_flow_logs()
.resource_type(FlowLogsResourceType::Vpc)
.set_resource_ids(Some(self.vpc_ids.clone()))
.traffic_type(TrafficType::All)
.log_destination_type(LogDestinationType::S3)
.log_destination(&self.destination_s3_bucket_uri)
.destination_options(
DestinationOptionsRequest::builder()
.file_format(DestinationFileFormat::Parquet)
.hive_compatible_partitions(false)
.per_hour_partition(true)
.build(),
)
.log_format("${region} ${az-id} ${vpc-id} ${flow-direction} ${pkt-srcaddr} ${pkt-dstaddr} ${srcport} ${dstport} ${start} ${bytes}")
.send()
.await?;
if let Some(unsuccessful) = flow_logs
.unsuccessful
.as_ref()
.and_then(|v| if v.is_empty() { None } else { Some(v) })
{
anyhow::bail!("VPC flow log creation unsuccessful: {unsuccessful:?}");
}
Ok(flow_logs.flow_log_ids().iter().cloned().collect())
}
}
struct GlueDatabase {
database_name: String,
pod_info_table_name: String,
pod_info_s3_bucket_uri: String,
vpc_flow_logs_table_name: String,
vpc_flow_logs_s3_bucket_uri: String,
results_table_name: String,
results_s3_bucket_uri: String,
}
impl GlueDatabase {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let glue_client = aws_sdk_glue::Client::new(sdk_config);
let db = DatabaseInput::builder().name(&self.database_name).build()?;
glue_client
.create_database()
.database_input(db.clone())
.send()
.await?;
let pod_info_columns = &[
Column::builder()
.name("namespace")
.r#type("string")
.build()?,
Column::builder().name("name").r#type("string").build()?,
Column::builder().name("ip").r#type("string").build()?,
Column::builder()
.name("creation_time")
.r#type("timestamp")
.build()?,
Column::builder().name("node").r#type("string").build()?,
Column::builder().name("az").r#type("string").build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.pod_info_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.pod_info_s3_bucket_uri)
.compressed(false)
.set_columns(Some(pod_info_columns.into_iter().cloned().collect()))
.input_format("org.apache.hadoop.mapred.TextInputFormat")
.output_format(
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.serde2.OpenCSVSerde",
)
.parameters("separatorChar", ",")
.parameters("quoteChar", "`")
.parameters("escapeChar", r"\")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "csv")
.parameters("skip.header.line.count", "1")
.retention(0)
.build()?,
)
.send()
.await?;
let vpc_flow_logs_columns = &[
Column::builder().name("region").r#type("string").build()?,
Column::builder().name("az_id").r#type("string").build()?,
Column::builder().name("vpc_id").r#type("string").build()?,
Column::builder()
.name("flow_direction")
.r#type("string")
.build()?,
Column::builder()
.name("pkt_srcaddr")
.r#type("string")
.build()?,
Column::builder()
.name("pkt_dstaddr")
.r#type("string")
.build()?,
Column::builder().name("srcport").r#type("int").build()?,
Column::builder().name("dstport").r#type("int").build()?,
Column::builder().name("start").r#type("bigint").build()?,
Column::builder().name("bytes").r#type("bigint").build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.vpc_flow_logs_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.vpc_flow_logs_s3_bucket_uri)
.compressed(false)
.set_columns(Some(vpc_flow_logs_columns.into_iter().cloned().collect()))
.input_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
)
.output_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
)
.parameters("serialization.format", "1")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "parquet")
.retention(0)
.build()?,
)
.send()
.await?;
let athena_results_columns = &[
Column::builder().name("time").r#type("timestamp").build()?,
Column::builder().name("traffic").r#type("string").build()?,
Column::builder()
.name("total_bytes")
.r#type("bigint")
.build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.results_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.results_s3_bucket_uri)
.compressed(false)
.set_columns(Some(athena_results_columns.into_iter().cloned().collect()))
.input_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
)
.output_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
)
.parameters("serialization.format", "1")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "parquet")
.retention(0)
.build()?,
)
.send()
.await?;
Ok(())
}
}
struct AthenaQuery {
query_name: String,
glue_database: String,
invocation_frequency: usize,
athena_results_table_name: String,
vpc_flow_logs_table_name: String,
pod_info_table_name: String,
}
impl AthenaQuery {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<String> {
let Self {
athena_results_table_name,
vpc_flow_logs_table_name,
pod_info_table_name,
invocation_frequency,
..
} = self;
let query_string = format!(
r#"
INSERT INTO "{athena_results_table_name}"
WITH
ip_addresses_and_az_mapping AS (
SELECT
DISTINCT pkt_srcaddr AS ipaddress,
az_id
FROM "{vpc_flow_logs_table_name}"
WHERE flow_direction = 'egress'
AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute))
),
egress_flows_of_pods_with_status AS (
SELECT
"{pod_info_table_name}".name AS srcpodname,
pkt_srcaddr AS srcaddr,
pkt_dstaddr AS dstaddr,
"{vpc_flow_logs_table_name}".az_id AS srcazid,
bytes,
start
FROM "{vpc_flow_logs_table_name}"
INNER JOIN "{pod_info_table_name}" ON "{vpc_flow_logs_table_name}".pkt_srcaddr = "{pod_info_table_name}".ip
WHERE flow_direction = 'egress'
AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute))
),
cross_az_traffic_by_pod AS (
SELECT
srcaddr,
srcpodname,
dstaddr,
"{pod_info_table_name}".name AS dstpodname,
srcazid,
ip_addresses_and_az_mapping.az_id AS dstazid,
bytes,
start
FROM egress_flows_of_pods_with_status
INNER JOIN "{pod_info_table_name}" ON dstaddr = "{pod_info_table_name}".ip
LEFT JOIN ip_addresses_and_az_mapping ON dstaddr = ipaddress
WHERE ip_addresses_and_az_mapping.az_id != srcazid
)
SELECT
date_trunc('MINUTE', from_unixtime(start)) AS time,
CONCAT(srcpodname, ' -> ', dstpodname) AS traffic,
SUM(bytes) AS total_bytes
FROM cross_az_traffic_by_pod
GROUP BY date_trunc('MINUTE', from_unixtime(start)), CONCAT(srcpodname, ' -> ', dstpodname)
ORDER BY time, total_bytes DESC
"#
);
let athena_client = aws_sdk_athena::Client::new(sdk_config);
let res = athena_client
.create_named_query()
.name(&self.query_name)
.database(&self.glue_database)
.query_string(query_string)
.send()
.await?;
Ok(res.named_query_id.unwrap())
}
}
struct StateMachine {
name: String,
role_arn: String,
named_query_id: String,
glue_database: String,
lambda_function_name: String,
athena_results_s3_bucket_uri: String,
log_group_arn: String,
}
impl StateMachine {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let sfn_client = aws_sdk_sfn::Client::new(sdk_config);
sfn_client
.create_state_machine()
.name(&self.name)
.role_arn(&self.role_arn)
.logging_configuration(
LoggingConfiguration::builder()
.level(LogLevel::All)
.destinations(
LogDestination::builder()
.cloud_watch_logs_log_group(
CloudWatchLogsLogGroup::builder()
.log_group_arn(&self.log_group_arn)
.build(),
)
.build(),
)
.build(),
)
.definition(
json!(
{
"StartAt": "Invoke",
"States": {
"Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Output": "{% $states.result.Payload %}",
"Arguments": {
"FunctionName": self.lambda_function_name,
"Payload": json!({
"detail-type": "Scheduled Event",
"source": "aws.events",
"detail": {}
}).to_string()
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "Check"
},
"Check": {
"Type": "Choice",
"Choices": [
{
"Next": "GetNamedQuery",
"Condition": "{% $states.input.statusCode = 200 %}"
}
],
"Default": "Fail"
},
"GetNamedQuery": {
"Type": "Task",
"Arguments": {
"NamedQueryId": self.named_query_id
},
"Resource": "arn:aws:states:::aws-sdk:athena:getNamedQuery",
"Output": {
"QueryString": "{% $states.result.NamedQuery.QueryString %}"
},
"Next": "StartQueryExecution"
},
"StartQueryExecution": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Arguments": {
"QueryString": "{% $states.input.QueryString %}",
"QueryExecutionContext": {
"Database": self.glue_database
},
"ResultConfiguration": {
"OutputLocation": self.athena_results_s3_bucket_uri
},
"WorkGroup": "primary"
},
"End": true
},
"Fail": {
"Type": "Fail"
}
},
"QueryLanguage": "JSONata"
}
)
.to_string(),
)
.send()
.await?;
Ok(())
}
}
struct Schedule {
name: String,
interval_minutes: usize,
target_state_machine_arn: String,
target_role_arn: String,
dead_letter_queue_arn: Option<String>,
}
impl Schedule {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let sched_client = aws_sdk_scheduler::Client::new(sdk_config);
sched_client
.create_schedule()
.name(&self.name)
.schedule_expression(format!("rate({} minute)", self.interval_minutes))
.flexible_time_window(
FlexibleTimeWindow::builder()
.mode(FlexibleTimeWindowMode::Off)
.build()?,
)
.target(
Target::builder()
.arn(&self.target_state_machine_arn)
.role_arn(&self.target_role_arn)
.input(
json!({
"detail-type": "Scheduled Event",
"source": "aws.events",
"detail": {}
})
.to_string(),
)
.retry_policy(
RetryPolicy::builder()
.maximum_retry_attempts(0)
.maximum_event_age_in_seconds(60)
.build(),
)
.set_dead_letter_config(
self.dead_letter_queue_arn
.as_ref()
.map(|arn| DeadLetterConfig::builder().arn(arn).build()),
)
.build()?,
)
.send()
.await?;
Ok(())
}
}
struct KubernetesRoles {
region: String,
cluster: String,
k8s_role_prefix: String,
lambda_role_arn: String,
}
impl KubernetesRoles {
fn print(&self) -> anyhow::Result<()> {
let Self {
region,
cluster,
k8s_role_prefix,
lambda_role_arn,
} = self;
let yaml = format!(
r#"
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {k8s_role_prefix}-clusterrole
rules:
- apiGroups:
- ""
resources: ["nodes", "namespaces", "pods"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {k8s_role_prefix}-binding
subjects:
- kind: Group
name: {k8s_role_prefix}-group
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: ClusterRole
name: {k8s_role_prefix}-clusterrole
apiGroup: rbac.authorization.k8s.io
"#
);
let eksctl = format!(
r#"eksctl create iamidentitymapping \
--region "{region}"
--cluster "{cluster}" \
--arn "{lambda_role_arn}" \
--username "{k8s_role_prefix}-binding" \
--group "{k8s_role_prefix}-group"
"#
);
Ok(())
}
}

View File

@@ -0,0 +1,27 @@
[package]
name = "pod_info_dumper"
version = "0.0.0"
edition = "2024"
publish = false
[dependencies]
aws_lambda_events = { version = "0.16.0", default-features = false, features = ["eventbridge"] }
aws-config = { workspace = true }
aws-sdk-eks = "1.75.0"
aws-sdk-s3 = { workspace = true }
aws-sdk-sts = "1.65.0"
aws-sigv4 = "1.3.0"
base64 = { version = "0.22.1" }
csv = { version = "1.3.1", default-features = false }
http = { workspace = true }
k8s-openapi = { version = "0.24.0", default-features = false, features = ["v1_31"] }
kube = { version = "0.99.0", default-features = false, features = ["client", "rustls-tls"] }
lambda_runtime = { version = "0.13.0", default-features = false, features = ["tracing"] }
rustls = { version = "0.23.25" }
rustls-pemfile = { workspace = true }
secrecy = "0.10.3"
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true, features = ["asm"] }
tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true, features = ["max_level_debug", "release_max_level_info"] }

View File

@@ -0,0 +1,8 @@
# pod_info_dumper
An event-triggered AWS lambda function that writes the list of all pods with
node information to a CSV file in S3.
```shell
cargo lambda build -p pod_info_dumper --output-format Zip --x86-64 --profile release-lambda-function
```

View File

@@ -0,0 +1,420 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use std::{env, io};
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::retry::RetryConfig;
use aws_lambda_events::event::eventbridge::EventBridgeEvent;
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
use aws_sdk_s3::types::ChecksumAlgorithm;
use aws_sdk_sts::config::ProvideCredentials;
use aws_sigv4::http_request::{
SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
};
use aws_sigv4::sign::v4;
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD;
use base64::prelude::*;
use k8s_openapi::api::core::v1::{Node, Pod};
use k8s_openapi::chrono::SecondsFormat;
use kube::api::{Api, ListParams, ResourceExt};
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
use secrecy::SecretString;
use serde::ser::SerializeMap;
use sha2::{Digest as _, Sha256};
const AZ_LABEL: &str = "topology.kubernetes.io/zone";
#[derive(Debug)]
struct Config {
aws_account_id: String,
s3_bucket: S3BucketConfig,
eks_cluster: EksClusterConfig,
}
#[derive(Debug)]
struct S3BucketConfig {
region: String,
name: String,
key: String,
}
impl S3BucketConfig {
#[tracing::instrument(skip_all, err)]
async fn create_sdk_config(&self) -> Result<aws_config::SdkConfig, Error> {
let region = aws_config::Region::new(self.region.clone());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
}
#[derive(Debug)]
struct EksClusterConfig {
region: String,
name: String,
}
impl EksClusterConfig {
#[tracing::instrument(skip_all, err)]
async fn create_sdk_config(&self) -> Result<aws_config::SdkConfig, Error> {
let region = aws_config::Region::new(self.region.clone());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
}
#[tokio::main]
pub async fn start() -> Result<(), Error> {
tracing::init_default_subscriber();
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();
tracing::info!("function handler started");
let config = Config {
aws_account_id: env::var("NEON_ACCOUNT_ID")?,
s3_bucket: S3BucketConfig {
region: env::var("NEON_REGION")?,
name: env::var("NEON_S3_BUCKET_NAME")?,
key: env::var("NEON_S3_BUCKET_KEY")?,
},
eks_cluster: EksClusterConfig {
region: env::var("NEON_REGION")?,
name: env::var("NEON_CLUSTER")?,
},
};
run(service_fn(async |event: LambdaEvent<EventBridgeEvent<serde_json::Value>>| -> Result<StatusResponse, Error> {
function_handler(event, &config).await
}))
.await
}
#[derive(Debug, PartialEq)]
struct StatusResponse {
status_code: http::StatusCode,
body: Cow<'static, str>,
}
impl StatusResponse {
fn ok() -> Self {
StatusResponse {
status_code: http::StatusCode::OK,
body: "OK".into(),
}
}
}
impl serde::Serialize for StatusResponse {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut serializer = serializer.serialize_map(None)?;
serializer.serialize_entry("statusCode", &self.status_code.as_u16())?;
serializer.serialize_entry("body", &self.body)?;
serializer.end()
}
}
#[tracing::instrument(skip_all, fields(?event), err)]
async fn function_handler(
event: LambdaEvent<EventBridgeEvent<serde_json::Value>>,
config: &Config,
) -> Result<StatusResponse, Error> {
tracing::info!("function handler called");
let kube_client = connect_to_cluster(config).await?;
let s3_client = connect_to_s3(config).await?;
let nodes_azs = get_nodes_azs(kube_client.clone()).await?;
let mut pods_info = get_current_pods(kube_client.clone(), &nodes_azs).await?;
pods_info.sort_unstable();
let mut csv = Vec::with_capacity(64 * 1024);
write_csv(&pods_info, &mut csv)?;
tracing::info!(
"csv is {} bytes, containing {} pods",
csv.len(),
pods_info.len()
);
upload_csv(config, &s3_client, &csv).await?;
tracing::info!("pod info successfully stored");
Ok(StatusResponse::ok())
}
#[derive(Debug, serde::Serialize, PartialEq, Eq, PartialOrd, Ord)]
struct PodInfo<'a> {
namespace: String,
name: String,
ip: String,
creation_time: String,
node: String,
az: Option<&'a str>,
}
#[tracing::instrument(skip_all, err)]
async fn connect_to_cluster(config: &Config) -> Result<kube::Client, Error> {
let sdk_config = config.eks_cluster.create_sdk_config().await?;
let eks_client = aws_sdk_eks::Client::new(&sdk_config);
let resp = eks_client
.describe_cluster()
.name(&config.eks_cluster.name)
.send()
.await?;
let cluster = resp
.cluster()
.ok_or_else(|| format!("cluster not found: {}", config.eks_cluster.name))?;
let endpoint = cluster.endpoint().ok_or("cluster endpoint not found")?;
let ca_data = cluster
.certificate_authority()
.and_then(|ca| ca.data())
.ok_or("cluster certificate data not found")?;
let mut k8s_config = kube::Config::new(endpoint.parse()?);
let cert_bytes = STANDARD.decode(ca_data)?;
let certs = rustls_pemfile::certs(&mut cert_bytes.as_slice())
.map(|c| c.map(|c| c.to_vec()))
.collect::<Result<_, _>>()?;
k8s_config.root_cert = Some(certs);
k8s_config.auth_info.token = Some(
create_kube_auth_token(
&sdk_config,
&config.eks_cluster.name,
Duration::from_secs(10 * 60),
)
.await?,
);
tracing::info!("cluster description completed");
Ok(kube::Client::try_from(k8s_config)?)
}
#[tracing::instrument(skip_all, err)]
async fn create_kube_auth_token(
sdk_config: &aws_config::SdkConfig,
cluster_name: &str,
expires_in: Duration,
) -> Result<SecretString, Error> {
let identity = sdk_config
.credentials_provider()
.unwrap()
.provide_credentials()
.await?
.into();
let region = sdk_config.region().expect("region").as_ref();
let host = format!("sts.{region}.amazonaws.com");
let get_caller_id_url = format!("https://{host}/?Action=GetCallerIdentity&Version=2011-06-15");
let mut signing_settings = SigningSettings::default();
signing_settings.signature_location = SignatureLocation::QueryParams;
signing_settings.expires_in = Some(expires_in);
let signing_params = v4::SigningParams::builder()
.identity(&identity)
.region(region)
.name("sts")
.time(SystemTime::now())
.settings(signing_settings)
.build()?
.into();
let signable_request = SignableRequest::new(
"GET",
&get_caller_id_url,
[("host", host.as_str()), ("x-k8s-aws-id", cluster_name)].into_iter(),
SignableBody::Bytes(&[]),
)?;
let (signing_instructions, _signature) = sign(signable_request, &signing_params)?.into_parts();
let mut token_request = http::Request::get(get_caller_id_url).body(()).unwrap();
signing_instructions.apply_to_request_http1x(&mut token_request);
let token = format!(
"k8s-aws-v1.{}",
BASE64_STANDARD_NO_PAD.encode(token_request.uri().to_string())
)
.into();
Ok(token)
}
#[tracing::instrument(skip_all, err)]
async fn connect_to_s3(config: &Config) -> Result<aws_sdk_s3::Client, Error> {
let sdk_config = config.s3_bucket.create_sdk_config().await?;
let s3_client = aws_sdk_s3::Client::from_conf(
aws_sdk_s3::config::Builder::from(&sdk_config)
.retry_config(RetryConfig::standard())
.build(),
);
Ok(s3_client)
}
#[tracing::instrument(skip_all, err)]
async fn get_nodes_azs(client: kube::Client) -> Result<HashMap<String, String>, Error> {
let nodes = Api::<Node>::all(client);
let list_params = ListParams::default().timeout(10);
let mut nodes_azs = HashMap::default();
for node in nodes.list(&list_params).await? {
let Some(name) = node.metadata.name else {
tracing::warn!("pod without name");
continue;
};
let Some(mut labels) = node.metadata.labels else {
tracing::warn!(name, "pod without labels");
continue;
};
let Some(az) = labels.remove(AZ_LABEL) else {
tracing::warn!(name, "pod without AZ label");
continue;
};
tracing::debug!(name, az, "adding node");
nodes_azs.insert(name, az);
}
Ok(nodes_azs)
}
#[tracing::instrument(skip_all, err)]
async fn get_current_pods(
client: kube::Client,
node_az: &HashMap<String, String>,
) -> Result<Vec<PodInfo<'_>>, Error> {
let pods = Api::<Pod>::all(client);
let mut pods_info = vec![];
let mut continuation_token = Some(String::new());
while let Some(token) = continuation_token {
let list_params = ListParams::default()
.timeout(10)
.limit(500)
.continue_token(&token);
let list = pods.list(&list_params).await?;
continuation_token = list.metadata.continue_;
tracing::info!("received list of {} pods", list.items.len());
for pod in list.items {
let name = pod.name_any();
let Some(namespace) = pod.namespace() else {
tracing::warn!(name, "pod without namespace");
continue;
};
let Some(status) = pod.status else {
tracing::warn!(namespace, name, "pod without status");
continue;
};
let Some(conditions) = status.conditions else {
tracing::warn!(namespace, name, "pod without conditions");
continue;
};
let Some(ready_condition) = conditions.iter().find(|cond| cond.type_ == "Ready") else {
tracing::debug!(namespace, name, "pod not ready");
continue;
};
let Some(ref ready_time) = ready_condition.last_transition_time else {
tracing::warn!(
namespace,
name,
"pod ready condition without transition time"
);
continue;
};
let Some(spec) = pod.spec else {
tracing::warn!(namespace, name, "pod without spec");
continue;
};
let Some(node) = spec.node_name else {
tracing::warn!(namespace, name, "pod without node");
continue;
};
let Some(ip) = status.pod_ip else {
tracing::warn!(namespace, name, "pod without IP");
continue;
};
let az = node_az.get(&node).map(String::as_str);
let creation_time = ready_time.0.to_rfc3339_opts(SecondsFormat::Secs, true);
let pod_info = PodInfo {
namespace,
name,
ip,
creation_time,
node,
az,
};
tracing::debug!(?pod_info, "adding pod");
pods_info.push(pod_info);
}
}
Ok(pods_info)
}
#[tracing::instrument(skip_all, err)]
fn write_csv<W: io::Write>(pods_info: &Vec<PodInfo>, writer: W) -> Result<(), Error> {
let mut w = csv::Writer::from_writer(writer);
for pod in pods_info {
w.serialize(pod)?;
}
w.flush()?;
Ok(())
}
#[tracing::instrument(skip_all, err)]
async fn upload_csv(
config: &Config,
s3_client: &aws_sdk_s3::Client,
csv: &[u8],
) -> Result<aws_sdk_s3::operation::put_object::PutObjectOutput, Error> {
let mut hasher = Sha256::new();
hasher.update(csv);
let csum = hasher.finalize();
let resp = s3_client
.put_object()
.bucket(&config.s3_bucket.name)
.key(&config.s3_bucket.key)
.content_type("text/csv")
.checksum_algorithm(ChecksumAlgorithm::Sha256)
.checksum_sha256(STANDARD.encode(csum))
.body(ByteStream::from(SdkBody::from(csv)))
.expected_bucket_owner(&config.aws_account_id)
.send()
.await?;
Ok(resp)
}

View File

@@ -0,0 +1,3 @@
fn main() -> Result<(), lambda_runtime::Error> {
pod_info_dumper::start()
}

View File

@@ -51,54 +51,9 @@ pub struct NodeMetadata {
/// If there cannot be a static default value because we need to make runtime
/// checks to determine the default, make it an `Option` (which defaults to None).
/// The runtime check should be done in the consuming crate, i.e., `pageserver`.
///
/// Unknown fields are silently ignored during deserialization.
/// The alternative, which we used in the past, was to set `deny_unknown_fields`,
/// which fails deserialization, and hence pageserver startup, if there is an unknown field.
/// The reason we don't do that anymore is that it complicates
/// usage of config fields for feature flagging, which we commonly do for
/// region-by-region rollouts.
/// The complications mainly arise because the `pageserver.toml` contents on a
/// prod server have a separate lifecycle from the pageserver binary.
/// For instance, `pageserver.toml` contents today are defined in the internal
/// infra repo, and thus introducing a new config field to pageserver and
/// rolling it out to prod servers are separate commits in separate repos
/// that can't be made or rolled back atomically.
/// Rollbacks in particular pose a risk with deny_unknown_fields because
/// the old pageserver binary may reject a new config field, resulting in
/// an outage unless the person doing the pageserver rollback remembers
/// to also revert the commit that added the config field in to the
/// `pageserver.toml` templates in the internal infra repo.
/// (A pre-deploy config check would eliminate this risk during rollbacks,
/// cf [here](https://github.com/neondatabase/cloud/issues/24349).)
/// In addition to this compatibility problem during emergency rollbacks,
/// deny_unknown_fields adds further complications when decomissioning a feature
/// flag: with deny_unknown_fields, we can't remove a flag from the [`ConfigToml`]
/// until all prod servers' `pageserver.toml` files have been updated to a version
/// that doesn't specify the flag. Otherwise new software would fail to start up.
/// This adds the requirement for an intermediate step where the new config field
/// is accepted but ignored, prolonging the decomissioning process by an entire
/// release cycle.
/// By contrast with unknown fields silently ignored, decomissioning a feature
/// flag is a one-step process: we can skip the intermediate step and straight
/// remove the field from the [`ConfigToml`]. We leave the field in the
/// `pageserver.toml` files on prod servers until we reach certainty that we
/// will not roll back to old software whose behavior was dependent on config.
/// Then we can remove the field from the templates in the internal infra repo.
/// This process is [documented internally](
/// https://docs.neon.build/storage/pageserver_configuration.html).
///
/// Note that above relaxed compatbility for the config format does NOT APPLY
/// TO THE STORAGE FORMAT. As general guidance, when introducing storage format
/// changes, ensure that the potential rollback target version will be compatible
/// with the new format. This must hold regardless of what flags are set in in the `pageserver.toml`:
/// any format version that exists in an environment must be compatible with the software that runs there.
/// Use a pageserver.toml flag only to gate whether software _writes_ the new format.
/// For more compatibility considerations, refer to [internal docs](
/// https://docs.neon.build/storage/compat.html?highlight=compat#format-versions--compatibility)
#[serde_as]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct ConfigToml {
// types mapped 1:1 into the runtime PageServerConfig type
pub listen_pg_addr: String,
@@ -183,6 +138,7 @@ pub struct ConfigToml {
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DiskUsageEvictionTaskConfig {
pub max_usage_pct: utils::serde_percent::Percent,
pub min_avail_bytes: u64,
@@ -197,11 +153,13 @@ pub struct DiskUsageEvictionTaskConfig {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub enum PageServicePipeliningConfig {
Serial,
Pipelined(PageServicePipeliningConfigPipelined),
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PageServicePipeliningConfigPipelined {
/// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
@@ -217,6 +175,7 @@ pub enum PageServiceProtocolPipelinedExecutionStrategy {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub enum GetVectoredConcurrentIo {
/// The read path is fully sequential: layers are visited
/// one after the other and IOs are issued and waited upon
@@ -335,7 +294,7 @@ pub struct MaxVectoredReadBytes(pub NonZeroUsize);
/// Tenant-level configuration values, used for various purposes.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(default)]
#[serde(deny_unknown_fields, default)]
pub struct TenantConfigToml {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the

View File

@@ -1104,7 +1104,7 @@ pub struct CompactionAlgorithmSettings {
}
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },

View File

@@ -1,26 +0,0 @@
use std::time::{Duration, Instant};
#[derive(Default)]
pub struct ElapsedAccum {
accum: Duration,
}
impl ElapsedAccum {
pub fn get(&self) -> Duration {
self.accum
}
pub fn guard(&mut self) -> impl Drop + '_ {
let start = Instant::now();
scopeguard::guard(start, |last_wait_at| {
self.accum += Instant::now() - last_wait_at;
})
}
pub async fn measure<Fut, O>(&mut self, fut: Fut) -> O
where
Fut: Future<Output = O>,
{
let _guard = self.guard();
fut.await
}
}

View File

@@ -93,8 +93,6 @@ pub mod try_rcu;
pub mod guard_arc_swap;
pub mod elapsed_accum;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;

View File

@@ -111,17 +111,9 @@ impl<T> OnceCell<T> {
}
}
/// Like [`Self::get_or_init_detached_measured`], but without out parameter for time spent waiting.
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
self.get_or_init_detached_measured(None).await
}
/// Returns a guard to an existing initialized value, or returns an unique initialization
/// permit which can be used to initialize this `OnceCell` using `OnceCell::set`.
pub async fn get_or_init_detached_measured(
&self,
mut wait_time: Option<&mut crate::elapsed_accum::ElapsedAccum>,
) -> Result<Guard<'_, T>, InitPermit> {
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
// It looks like OnceCell::get_or_init could be implemented using this method instead of
// duplication. However, that makes the future be !Send due to possibly holding on to the
// MutexGuard over an await point.
@@ -133,16 +125,12 @@ impl<T> OnceCell<T> {
}
guard.init_semaphore.clone()
};
{
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
let fut = sem.acquire();
if let Some(wait_time) = wait_time.as_mut() {
wait_time.measure(fut).await
} else {
fut.await
}
sem.acquire().await
};
let Ok(permit) = permit else {

View File

@@ -16,7 +16,7 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
use metrics::set_build_info_metric;
use nix::sys::socket::{setsockopt, sockopt};
use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
use pageserver::config::{PageServerConf, PageserverIdentity};
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
use pageserver::deletion_queue::DeletionQueue;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
@@ -98,7 +98,7 @@ fn main() -> anyhow::Result<()> {
env::set_current_dir(&workdir)
.with_context(|| format!("Failed to set application's current dir to '{workdir}'"))?;
let (conf, ignored) = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
let conf = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
// Initialize logging.
//
@@ -144,17 +144,7 @@ fn main() -> anyhow::Result<()> {
&[("node_id", &conf.id.to_string())],
);
// Warn about ignored config items; see pageserver_api::config::ConfigToml
// doc comment for rationale why we prefer this over serde(deny_unknown_fields).
{
let ignored_fields::Paths { paths } = &ignored;
for path in paths {
warn!(?path, "ignoring unknown configuration item");
}
}
// Log configuration items for feature-flag-like config
// (maybe we should automate this with a visitor?).
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
@@ -217,7 +207,7 @@ fn main() -> anyhow::Result<()> {
tracing::info!("Initializing page_cache...");
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf, ignored, otel_guard).context("Failed to start pageserver")?;
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
@@ -227,7 +217,7 @@ fn initialize_config(
identity_file_path: &Utf8Path,
cfg_file_path: &Utf8Path,
workdir: &Utf8Path,
) -> anyhow::Result<(&'static PageServerConf, ignored_fields::Paths)> {
) -> anyhow::Result<&'static PageServerConf> {
// The deployment orchestrator writes out an indentity file containing the node id
// for all pageservers. This file is the source of truth for the node id. In order
// to allow for rolling back pageserver releases, the node id is also included in
@@ -256,36 +246,16 @@ fn initialize_config(
let config_file_contents =
std::fs::read_to_string(cfg_file_path).context("read config file from filesystem")?;
let config_toml = serde_path_to_error::deserialize(
toml_edit::de::Deserializer::from_str(&config_file_contents)
.context("build toml deserializer")?,
)
.context("deserialize config toml")?;
// Deserialize the config file contents into a ConfigToml.
let config_toml: pageserver_api::config::ConfigToml = {
let deserializer = toml_edit::de::Deserializer::from_str(&config_file_contents)
.context("build toml deserializer")?;
let mut path_to_error_track = serde_path_to_error::Track::new();
let deserializer =
serde_path_to_error::Deserializer::new(deserializer, &mut path_to_error_track);
serde::Deserialize::deserialize(deserializer).context("deserialize config toml")?
};
// Find unknown fields by re-serializing the parsed ConfigToml and comparing it to the on-disk file.
// Any fields that are only in the on-disk version are unknown.
// (The assumption here is that the ConfigToml doesn't to skip_serializing_if.)
// (Make sure to read the ConfigToml doc comment on why we only want to warn about, but not fail startup, on unknown fields).
let ignored = {
let ondisk_toml = config_file_contents
.parse::<toml_edit::DocumentMut>()
.context("parse original config as toml document")?;
let parsed_toml = toml_edit::ser::to_document(&config_toml)
.context("re-serialize config to toml document")?;
pageserver::config::ignored_fields::find(ondisk_toml, parsed_toml)
};
// Construct the runtime god object (it's called PageServerConf but actually is just global shared state).
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
.context("runtime-validation of config toml")?;
let conf = Box::leak(Box::new(conf));
Ok((conf, ignored))
Ok(Box::leak(Box::new(conf)))
}
struct WaitForPhaseResult<F: std::future::Future + Unpin> {
@@ -336,7 +306,6 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
ignored: ignored_fields::Paths,
otel_guard: Option<OtelGuard>,
) -> anyhow::Result<()> {
// Monotonic time for later calculating startup duration
@@ -360,7 +329,7 @@ fn start_pageserver(
pageserver::metrics::tokio_epoll_uring::Collector::new(),
))
.unwrap();
pageserver::preinitialize_metrics(conf, ignored);
pageserver::preinitialize_metrics(conf);
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes

View File

@@ -4,8 +4,6 @@
//! file, or on the command line.
//! See also `settings.md` for better description on every parameter.
pub mod ignored_fields;
use std::env;
use std::num::NonZeroUsize;
use std::sync::Arc;
@@ -562,6 +560,7 @@ impl PageServerConf {
}
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(deny_unknown_fields)]
pub struct PageserverIdentity {
pub id: NodeId,
}
@@ -633,4 +632,82 @@ mod tests {
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
/// If there's a typo in the pageserver config, we'd rather catch that typo
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
/// made it in the believe that their config change is effective.
///
/// The default in serde is to allow unknown fields, so, we rely
/// on developer+review discipline to add `deny_unknown_fields` when adding
/// new structs to the config, and these tests here as a regression test.
///
/// The alternative to all of this would be to allow unknown fields in the config.
/// To catch them, we could have a config check tool or mgmt API endpoint that
/// compares the effective config with the TOML on disk and makes sure that
/// the on-disk TOML is a strict subset of the effective config.
mod unknown_fields_handling {
macro_rules! test {
($short_name:ident, $input:expr) => {
#[test]
fn $short_name() {
let input = $input;
let err = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(&input)
.expect_err("some_invalid_field is an invalid field");
dbg!(&err);
assert!(err.to_string().contains("some_invalid_field"));
}
};
}
use indoc::indoc;
test!(
toplevel,
indoc! {r#"
some_invalid_field = 23
"#}
);
test!(
toplevel_nested,
indoc! {r#"
[some_invalid_field]
foo = 23
"#}
);
test!(
disk_usage_based_eviction,
indoc! {r#"
[disk_usage_based_eviction]
some_invalid_field = 23
"#}
);
test!(
tenant_config,
indoc! {r#"
[tenant_config]
some_invalid_field = 23
"#}
);
test!(
l0_flush,
indoc! {r#"
[l0_flush]
mode = "direct"
some_invalid_field = 23
"#}
);
// TODO: fix this => https://github.com/neondatabase/neon/issues/8915
// test!(
// remote_storage_config,
// indoc! {r#"
// [remote_storage_config]
// local_path = "/nonexistent"
// some_invalid_field = 23
// "#}
// );
}
}

View File

@@ -1,179 +0,0 @@
//! Check for fields in the on-disk config file that were ignored when
//! deserializing [`pageserver_api::config::ConfigToml`].
//!
//! This could have been part of the [`pageserver_api::config`] module,
//! but the way we identify unused fields in this module
//! is specific to the format (TOML) and the implementation of the
//! deserialization for that format ([`toml_edit`]).
use std::collections::HashSet;
use itertools::Itertools;
/// Pass in the user-specified config and the re-serialized [`pageserver_api::config::ConfigToml`].
/// The returned [`Paths`] contains the paths to the fields that were ignored by deserialization
/// of the [`pageserver_api::config::ConfigToml`].
pub fn find(user_specified: toml_edit::DocumentMut, reserialized: toml_edit::DocumentMut) -> Paths {
let user_specified = paths(user_specified);
let reserialized = paths(reserialized);
fn paths(doc: toml_edit::DocumentMut) -> HashSet<String> {
let mut out = Vec::new();
let mut visitor = PathsVisitor::new(&mut out);
visitor.visit_table_like(doc.as_table());
HashSet::from_iter(out)
}
let mut ignored = HashSet::new();
// O(n) because of HashSet
for path in user_specified {
if !reserialized.contains(&path) {
ignored.insert(path);
}
}
Paths {
paths: ignored
.into_iter()
// sort lexicographically for deterministic output
.sorted()
.collect(),
}
}
pub struct Paths {
pub paths: Vec<String>,
}
struct PathsVisitor<'a> {
stack: Vec<String>,
out: &'a mut Vec<String>,
}
impl<'a> PathsVisitor<'a> {
fn new(out: &'a mut Vec<String>) -> Self {
Self {
stack: Vec::new(),
out,
}
}
fn visit_table_like(&mut self, table_like: &dyn toml_edit::TableLike) {
for (entry, item) in table_like.iter() {
self.stack.push(entry.to_string());
self.visit_item(item);
self.stack.pop();
}
}
fn visit_item(&mut self, item: &toml_edit::Item) {
match item {
toml_edit::Item::None => (),
toml_edit::Item::Value(value) => self.visit_value(value),
toml_edit::Item::Table(table) => {
self.visit_table_like(table);
}
toml_edit::Item::ArrayOfTables(array_of_tables) => {
for (i, table) in array_of_tables.iter().enumerate() {
self.stack.push(format!("[{i}]"));
self.visit_table_like(table);
self.stack.pop();
}
}
}
}
fn visit_value(&mut self, value: &toml_edit::Value) {
match value {
toml_edit::Value::String(_)
| toml_edit::Value::Integer(_)
| toml_edit::Value::Float(_)
| toml_edit::Value::Boolean(_)
| toml_edit::Value::Datetime(_) => self.out.push(self.stack.join(".")),
toml_edit::Value::Array(array) => {
for (i, value) in array.iter().enumerate() {
self.stack.push(format!("[{i}]"));
self.visit_value(value);
self.stack.pop();
}
}
toml_edit::Value::InlineTable(inline_table) => self.visit_table_like(inline_table),
}
}
}
#[cfg(test)]
pub(crate) mod tests {
fn test_impl(original: &str, parsed: &str, expect: [&str; 1]) {
let original: toml_edit::DocumentMut = original.parse().expect("parse original config");
let parsed: toml_edit::DocumentMut = parsed.parse().expect("parse re-serialized config");
let super::Paths { paths: actual } = super::find(original, parsed);
assert_eq!(actual, &expect);
}
#[test]
fn top_level() {
test_impl(
r#"
[a]
b = 1
c = 2
d = 3
"#,
r#"
[a]
b = 1
c = 2
"#,
["a.d"],
);
}
#[test]
fn nested() {
test_impl(
r#"
[a.b.c]
d = 23
"#,
r#"
[a]
e = 42
"#,
["a.b.c.d"],
);
}
#[test]
fn array_of_tables() {
test_impl(
r#"
[[a]]
b = 1
c = 2
d = 3
"#,
r#"
[[a]]
b = 1
c = 2
"#,
["a.[0].d"],
);
}
#[test]
fn array() {
test_impl(
r#"
foo = [ {bar = 23} ]
"#,
r#"
foo = [ { blup = 42 }]
"#,
["foo.[0].bar"],
);
}
}

View File

@@ -89,7 +89,7 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use once_cell::sync::Lazy;
use tracing::warn;
@@ -566,34 +566,6 @@ impl RequestContext {
}
}
pub(crate) fn ondemand_download_wait_observe(&self, duration: Duration) {
if duration == Duration::ZERO {
return;
}
match &self.scope {
Scope::Timeline { arc_arc } => arc_arc
.wait_ondemand_download_time
.observe(self.task_kind, duration),
_ => {
use once_cell::sync::Lazy;
use std::sync::Mutex;
use std::time::Duration;
use utils::rate_limit::RateLimit;
static LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
let mut guard = LIMIT.lock().unwrap();
guard.call2(|rate_limit_stats| {
warn!(
%rate_limit_stats,
backtrace=%std::backtrace::Backtrace::force_capture(),
"ondemand downloads should always happen within timeline scope",
);
});
}
}
}
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
span.inner().follows_from(from_span.inner());

View File

@@ -1,8 +1,10 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use enum_map::{Enum as _, EnumMap};
@@ -21,13 +23,13 @@ use pageserver_api::config::{
};
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use pin_project_lite::pin_project;
use postgres_backend::{QueryError, is_expected_io_error};
use pq_proto::framed::ConnectionError;
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use utils::id::TimelineId;
use crate::config;
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext};
use crate::pgdatadir_mapping::DatadirModificationStats;
@@ -497,100 +499,6 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) mod wait_ondemand_download_time {
use super::*;
const WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS: &[f64] = &[
0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, // 10 ms - 100ms
0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, // 100ms to 1s
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, // 1s to 10s
10.0, 20.0, 30.0, 40.0, 50.0, 60.0, // 10s to 1m
];
/// The task kinds for which we want to track wait times for on-demand downloads.
/// Other task kinds' wait times are accumulated in label value `unknown`.
pub(crate) const WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS: [TaskKind; 2] = [
TaskKind::PageRequestHandler,
TaskKind::WalReceiverConnectionHandler,
];
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL: Lazy<Vec<Histogram>> = Lazy::new(|| {
let histo = register_histogram_vec!(
"pageserver_wait_ondemand_download_seconds_global",
"Observations are individual tasks' wait times for on-demand downloads. \
If N tasks coalesce on an on-demand download, and it takes 10s, than we observe N * 10s.",
&["task_kind"],
WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS.into(),
)
.expect("failed to define a metric");
WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.map(|task_kind| histo.with_label_values(&[task_kind.into()]))
.collect::<Vec<_>>()
});
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_SUM: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
// use a name that _could_ be evolved into a per-timeline histogram later
"pageserver_wait_ondemand_download_seconds_sum",
"Like `pageserver_wait_ondemand_download_seconds_global` but per timeline",
&["tenant_id", "shard_id", "timeline_id", "task_kind"],
)
.unwrap()
});
pub struct WaitOndemandDownloadTimeSum {
counters: [Counter; WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS.len()],
}
impl WaitOndemandDownloadTimeSum {
pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self {
let counters = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.map(|task_kind| {
WAIT_ONDEMAND_DOWNLOAD_TIME_SUM
.get_metric_with_label_values(&[
tenant_id,
shard_id,
timeline_id,
task_kind.into(),
])
.unwrap()
})
.collect::<Vec<_>>();
Self {
counters: counters.try_into().unwrap(),
}
}
pub(crate) fn observe(&self, task_kind: TaskKind, duration: Duration) {
let maybe = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.enumerate()
.find(|(_, kind)| **kind == task_kind);
let Some((idx, _)) = maybe else {
return;
};
WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL[idx].observe(duration.as_secs_f64());
let counter = &self.counters[idx];
counter.inc_by(duration.as_secs_f64());
}
}
pub(crate) fn shutdown_timeline(tenant_id: &str, shard_id: &str, timeline_id: &str) {
for task_kind in WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS {
let _ = WAIT_ONDEMAND_DOWNLOAD_TIME_SUM.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
task_kind.into(),
]);
}
}
pub(crate) fn preinitialize_global_metrics() {
Lazy::force(&WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL);
}
}
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
@@ -2406,18 +2314,13 @@ impl RemoteOpFileKind {
}
}
pub(crate) static REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_remote_timeline_client_seconds_global",
"Time spent on remote timeline client operations. \
Grouped by task_kind, file_kind, operation_kind and status. \
The task_kind is \
- for layer downloads, populated from RequestContext (primary objective of having the label) \
- for index downloads, set to 'unknown' \
- for any upload operation, set to 'RemoteUploadTask' \
This keeps dimensionality at bay. \
"pageserver_remote_operation_seconds",
"Time spent on remote storage operations. \
Grouped by tenant, timeline, operation_kind and status. \
Does not account for time spent waiting in remote timeline client's queues.",
&["task_kind", "file_kind", "op_kind", "status"]
&["file_kind", "op_kind", "status"]
)
.expect("failed to define a metric")
});
@@ -2979,7 +2882,6 @@ pub(crate) struct TimelineMetrics {
pub storage_io_size: StorageIoSizeMetrics,
pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter,
pub wait_lsn_start_finish_counterpair: IntCounterPair,
pub wait_ondemand_download_time: wait_ondemand_download_time::WaitOndemandDownloadTimeSum,
shutdown: std::sync::atomic::AtomicBool,
}
@@ -3125,13 +3027,6 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let wait_ondemand_download_time =
wait_ondemand_download_time::WaitOndemandDownloadTimeSum::new(
&tenant_id,
&shard_id,
&timeline_id,
);
TimelineMetrics {
tenant_id,
shard_id,
@@ -3165,7 +3060,6 @@ impl TimelineMetrics {
wal_records_received,
wait_lsn_in_progress_micros,
wait_lsn_start_finish_counterpair,
wait_ondemand_download_time,
shutdown: std::sync::atomic::AtomicBool::default(),
}
}
@@ -3358,8 +3252,6 @@ impl TimelineMetrics {
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id]);
}
wait_ondemand_download_time::shutdown_timeline(tenant_id, shard_id, timeline_id);
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
tenant_id,
@@ -3481,18 +3373,13 @@ impl RemoteTimelineClientMetrics {
pub fn remote_operation_time(
&self,
task_kind: Option<TaskKind>,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
status: &'static str,
) -> Histogram {
REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY
.get_metric_with_label_values(&[
task_kind.as_ref().map(|tk| tk.into()).unwrap_or("unknown"),
file_kind.as_str(),
op_kind.as_str(),
status,
])
let key = (file_kind.as_str(), op_kind.as_str(), status);
REMOTE_OPERATION_TIME
.get_metric_with_label_values(&[key.0, key.1, key.2])
.unwrap()
}
@@ -3737,26 +3624,54 @@ impl Drop for RemoteTimelineClientMetrics {
/// Wrapper future that measures the time spent by a remote storage operation,
/// and records the time and success/failure as a prometheus metric.
pub(crate) trait MeasureRemoteOp<O, E>: Sized + Future<Output = Result<O, E>> {
async fn measure_remote_op(
pub(crate) trait MeasureRemoteOp: Sized {
fn measure_remote_op(
self,
task_kind: Option<TaskKind>, // not all caller contexts have a RequestContext / TaskKind handy
file_kind: RemoteOpFileKind,
op: RemoteOpKind,
metrics: Arc<RemoteTimelineClientMetrics>,
) -> Result<O, E> {
) -> MeasuredRemoteOp<Self> {
let start = Instant::now();
let res = self.await;
let duration = start.elapsed();
let status = if res.is_ok() { &"success" } else { &"failure" };
metrics
.remote_operation_time(task_kind, &file_kind, &op, status)
.observe(duration.as_secs_f64());
res
MeasuredRemoteOp {
inner: self,
file_kind,
op,
start,
metrics,
}
}
}
impl<Fut, O, E> MeasureRemoteOp<O, E> for Fut where Fut: Sized + Future<Output = Result<O, E>> {}
impl<T: Sized> MeasureRemoteOp for T {}
pin_project! {
pub(crate) struct MeasuredRemoteOp<F>
{
#[pin]
inner: F,
file_kind: RemoteOpFileKind,
op: RemoteOpKind,
start: Instant,
metrics: Arc<RemoteTimelineClientMetrics>,
}
}
impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
type Output = Result<O, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let poll_result = this.inner.poll(cx);
if let Poll::Ready(ref res) = poll_result {
let duration = this.start.elapsed();
let status = if res.is_ok() { &"success" } else { &"failure" };
this.metrics
.remote_operation_time(this.file_kind, this.op, status)
.observe(duration.as_secs_f64());
}
poll_result
}
}
pub mod tokio_epoll_uring {
use std::collections::HashMap;
@@ -4192,33 +4107,9 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
.set(u64::try_from(num_threads.get()).unwrap());
}
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_config_ignored_items",
"TOML items present in the on-disk configuration file but ignored by the pageserver config parser.\
The `item` label is the dot-separated path of the ignored item in the on-disk configuration file.\
The value for an unknown config item is always 1.\
There is a special label value \"\", which is 0, so that there is always a metric exposed (simplifies dashboards).",
&["item"]
)
.unwrap()
});
pub fn preinitialize_metrics(
conf: &'static PageServerConf,
ignored: config::ignored_fields::Paths,
) {
pub fn preinitialize_metrics(conf: &'static PageServerConf) {
set_page_service_config_max_batch_size(&conf.page_service_pipelining);
PAGESERVER_CONFIG_IGNORED_ITEMS
.with_label_values(&[""])
.set(0);
for path in &ignored.paths {
PAGESERVER_CONFIG_IGNORED_ITEMS
.with_label_values(&[path])
.set(1);
}
// Python tests need these and on some we do alerting.
//
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
@@ -4304,5 +4195,4 @@ pub fn preinitialize_metrics(
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);
tenant_throttling::preinitialize_global_metrics();
wait_ondemand_download_time::preinitialize_global_metrics();
}

View File

@@ -247,15 +247,6 @@ pub async fn libpq_listener_main(
type ConnectionHandlerResult = anyhow::Result<()>;
/// Perf root spans start at the per-request level, after shard routing.
/// This struct carries connection-level information to the root perf span definition.
#[derive(Clone)]
struct ConnectionPerfSpanFields {
peer_addr: String,
application_name: Option<String>,
compute_mode: Option<String>,
}
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
@@ -280,12 +271,6 @@ async fn page_service_conn_main(
let socket_fd = socket.as_raw_fd();
let peer_addr = socket.peer_addr().context("get peer address")?;
let perf_span_fields = ConnectionPerfSpanFields {
peer_addr: peer_addr.to_string(),
application_name: None, // filled in later
compute_mode: None, // filled in later
};
tracing::Span::current().record("peer_addr", field::display(peer_addr));
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
@@ -329,7 +314,6 @@ async fn page_service_conn_main(
tenant_manager,
auth,
pipelining_config,
perf_span_fields,
connection_ctx,
cancel.clone(),
gate_guard,
@@ -374,8 +358,6 @@ struct PageServerHandler {
/// `process_query` creates a child context from this one.
connection_ctx: RequestContext,
perf_span_fields: ConnectionPerfSpanFields,
cancel: CancellationToken,
/// None only while pagestream protocol is being processed.
@@ -721,13 +703,11 @@ impl BatchedFeMessage {
}
impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
perf_span_fields: ConnectionPerfSpanFields,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -737,7 +717,6 @@ impl PageServerHandler {
auth,
claims: None,
connection_ctx,
perf_span_fields,
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
@@ -775,7 +754,6 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_handles: &mut TimelineHandles,
conn_perf_span_fields: &ConnectionPerfSpanFields,
cancel: &CancellationToken,
ctx: &RequestContext,
protocol_version: PagestreamProtocolVersion,
@@ -974,9 +952,6 @@ impl PageServerHandler {
info_span!(
target: PERF_TRACE_TARGET,
"GET_PAGE",
peer_addr = conn_perf_span_fields.peer_addr,
application_name = conn_perf_span_fields.application_name,
compute_mode = conn_perf_span_fields.compute_mode,
tenant_id = %tenant_id,
shard_id = %shard.get_shard_identity().shard_slug(),
timeline_id = %timeline_id,
@@ -1606,7 +1581,6 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
&self.perf_span_fields,
&cancel,
ctx,
protocol_version,
@@ -1740,8 +1714,6 @@ impl PageServerHandler {
// Batcher
//
let perf_span_fields = self.perf_span_fields.clone();
let cancel_batcher = self.cancel.child_token();
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
@@ -1755,7 +1727,6 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
&perf_span_fields,
&cancel_batcher,
&ctx,
protocol_version,
@@ -2698,14 +2669,12 @@ where
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {
self.perf_span_fields.application_name = Some(app_name.to_string());
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
let (config, _) = parse_options(options);
for (key, value) in config {
if key == "neon.compute_mode" {
self.perf_span_fields.compute_mode = Some(value.clone());
Span::current().record("compute_mode", field::display(value));
}
}

View File

@@ -642,7 +642,6 @@ impl RemoteTimelineClient {
cancel,
)
.measure_remote_op(
Option::<TaskKind>::None,
RemoteOpFileKind::Index,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
@@ -740,7 +739,6 @@ impl RemoteTimelineClient {
ctx,
)
.measure_remote_op(
Some(ctx.task_kind()),
RemoteOpFileKind::Layer,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
@@ -2177,7 +2175,6 @@ impl RemoteTimelineClient {
&self.cancel,
)
.measure_remote_op(
Some(TaskKind::RemoteUploadTask),
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
Arc::clone(&self.metrics),
@@ -2194,7 +2191,6 @@ impl RemoteTimelineClient {
&self.cancel,
)
.measure_remote_op(
Some(TaskKind::RemoteUploadTask),
RemoteOpFileKind::Index,
RemoteOpKind::Upload,
Arc::clone(&self.metrics),

View File

@@ -975,10 +975,6 @@ impl LayerInner {
allow_download: bool,
ctx: &RequestContext,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
let mut wait_for_download_recorder =
scopeguard::guard(utils::elapsed_accum::ElapsedAccum::default(), |accum| {
ctx.ondemand_download_wait_observe(accum.get());
});
let (weak, permit) = {
// get_or_init_detached can:
// - be fast (mutex lock) OR uncontested semaphore permit acquire
@@ -987,7 +983,7 @@ impl LayerInner {
let locked = self
.inner
.get_or_init_detached_measured(Some(&mut wait_for_download_recorder))
.get_or_init_detached()
.await
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
@@ -1017,7 +1013,6 @@ impl LayerInner {
Err(permit) => (None, permit),
}
};
let _guard = wait_for_download_recorder.guard();
if let Some(weak) = weak {
// only drop the weak after dropping the heavier_once_cell guard
@@ -1207,7 +1202,6 @@ impl LayerInner {
permit: heavier_once_cell::InitPermit,
ctx: &RequestContext,
) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
let start = std::time::Instant::now();
let result = timeline
.remote_client
.download_layer_file(
@@ -1219,8 +1213,7 @@ impl LayerInner {
ctx,
)
.await;
let latency = start.elapsed();
let latency_millis = u64::try_from(latency.as_millis()).unwrap();
match result {
Ok(size) => {
assert_eq!(size, self.desc.file_size);
@@ -1236,8 +1229,9 @@ impl LayerInner {
Err(e) => {
panic!("post-condition failed: needs_download errored: {e:?}");
}
};
tracing::info!(size=%self.desc.file_size, %latency_millis, "on-demand download successful");
}
tracing::info!(size=%self.desc.file_size, "on-demand download successful");
timeline
.metrics
.resident_physical_size_add(self.desc.file_size);
@@ -1266,7 +1260,7 @@ impl LayerInner {
return Err(e);
}
tracing::error!(consecutive_failures, %latency_millis, "layer file download failed: {e:#}");
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,

View File

@@ -1244,10 +1244,6 @@ impl Timeline {
let mut replace_image_layers = Vec::new();
for layer in layers_to_rewrite {
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,

View File

@@ -9,4 +9,4 @@
#define BITMAP_SET(bm, bit) (bm)[(bit) >> 3] |= (1 << ((bit) & 7))
#define BITMAP_CLR(bm, bit) (bm)[(bit) >> 3] &= ~(1 << ((bit) & 7))
#endif /* NEON_BITMAP_H */
#endif //NEON_BITMAP_H

View File

@@ -13,6 +13,9 @@
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* IDENTIFICATION
* contrib/neon/control_plane_connector.c
*
*-------------------------------------------------------------------------
*/

View File

@@ -3,6 +3,9 @@
* extension_server.c
* Request compute_ctl to download extension files.
*
* IDENTIFICATION
* contrib/neon/extension_server.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"

View File

@@ -3,6 +3,9 @@
* extension_server.h
* Request compute_ctl to download extension files.
*
* IDENTIFICATION
* contrib/neon/extension_server.h
*
*-------------------------------------------------------------------------
*/

View File

@@ -1,4 +1,4 @@
/*-------------------------------------------------------------------------
/*
*
* file_cache.c
*
@@ -6,6 +6,10 @@
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* pgxn/neon/file_cache.c
*
*-------------------------------------------------------------------------
*/

View File

@@ -6,6 +6,10 @@
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* contrib/neon/libpqpagestore.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -30,7 +34,6 @@
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "neon.h"
#include "neon_perf_counters.h"

View File

@@ -1,11 +1,11 @@
#include "postgres.h"
#include <dirent.h>
#include <limits.h>
#include <string.h>
#include <signal.h>
#include <sys/stat.h>
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"

View File

@@ -1,7 +1,10 @@
/*-------------------------------------------------------------------------
*
* neon.c
* Main entry point into the neon exension
* Utility functions to expose neon specific information to user
*
* IDENTIFICATION
* contrib/neon/neon.c
*
*-------------------------------------------------------------------------
*/

View File

@@ -3,13 +3,15 @@
* neon.h
* Functions used in the initialization of this extension.
*
* IDENTIFICATION
* contrib/neon/neon.h
*
*-------------------------------------------------------------------------
*/
#ifndef NEON_H
#define NEON_H
#include "access/xlogdefs.h"
#include "access/xlogreader.h"
#include "utils/wait_event.h"
/* GUCs */
@@ -56,8 +58,8 @@ extern void SetNeonCurrentClusterSize(uint64 size);
extern uint64 GetNeonCurrentClusterSize(void);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
extern PGDLLEXPORT void WalProposerSync(int argc, char *argv[]);
extern PGDLLEXPORT void WalProposerMain(Datum main_arg);
extern PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
#endif /* NEON_H */

View File

@@ -12,8 +12,8 @@
#include "storage/procnumber.h"
#else
#include "storage/backendid.h"
#endif
#include "storage/proc.h"
#endif
static const uint64 io_wait_bucket_thresholds[] = {
2, 3, 6, 10, /* 0 us - 10 us */

View File

@@ -20,7 +20,6 @@
#include "access/xlogreader.h"
#include "libpq/pqformat.h"
#include "storage/fd.h"
#include "utils/memutils.h"
#include "utils/wait_event.h"
#include "libpq-fe.h"

View File

@@ -8,8 +8,8 @@
*
*-------------------------------------------------------------------------
*/
#ifndef PAGESTORE_CLIENT_h
#define PAGESTORE_CLIENT_h
#ifndef pageserver_h
#define pageserver_h
#include "neon_pgversioncompat.h"
@@ -17,8 +17,11 @@
#include "access/xlogdefs.h"
#include RELFILEINFO_HDR
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "storage/block.h"
#include "storage/buf_internals.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#define MAX_SHARDS 128
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
@@ -274,8 +277,13 @@ typedef struct
XLogRecPtr effective_request_lsn;
} neon_request_lsns;
#if PG_MAJORVERSION_NUM < 16
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, char *buffer);
#else
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, void *buffer);
#endif
extern int64 neon_dbsize(Oid dbNode);
/* utils for neon relsize cache */
@@ -318,4 +326,4 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
#endif /* PAGESTORE_CLIENT_H */
#endif

View File

@@ -37,6 +37,10 @@
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* contrib/neon/pagestore_smgr.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -51,7 +55,6 @@
#include "catalog/pg_class.h"
#include "common/hashfn.h"
#include "executor/instrument.h"
#include "libpq/pqformat.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "postmaster/interrupt.h"
@@ -1900,6 +1903,7 @@ neon_wallog_pagev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
log_pages = true;
}
else if (XLogInsertAllowed() &&
!ShutdownRequestPending &&
(forknum == FSM_FORKNUM || forknum == VISIBILITYMAP_FORKNUM))
{
log_pages = true;
@@ -3153,8 +3157,13 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
* The offsets in request_lsns, buffers, and mask are linked.
*/
static void
#if PG_MAJORVERSION_NUM < 16
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
char **buffers, BlockNumber nblocks, const bits8 *mask)
#else
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
void **buffers, BlockNumber nblocks, const bits8 *mask)
#endif
{
NeonResponse *resp;
uint64 ring_index;
@@ -3350,8 +3359,13 @@ Retry:
* To avoid breaking tests in the runtime please keep function signature in sync.
*/
void
#if PG_MAJORVERSION_NUM < 16
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, char *buffer)
#else
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, void *buffer)
#endif
{
neon_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL);
}

View File

@@ -6,6 +6,10 @@
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* contrib/neon/relsize_cache.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"

View File

@@ -7,7 +7,6 @@
#include <stdio.h>
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "utils/datetime.h"
#include "walproposer.h"

View File

@@ -50,8 +50,13 @@ PG_FUNCTION_INFO_V1(trigger_segfault);
* Linkage to functions in neon module.
* The signature here would need to be updated whenever function parameters change in pagestore_smgr.c
*/
#if PG_MAJORVERSION_NUM < 16
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, char *buffer);
#else
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, void *buffer);
#endif
static neon_read_at_lsn_type neon_read_at_lsn_ptr;

View File

@@ -223,9 +223,6 @@ struct Args {
/// Flag to use https for requests to peer's safekeeper API.
#[arg(long)]
pub use_https_safekeeper_api: bool,
/// Path to the JWT auth token used to authenticate with other safekeepers.
#[arg(long)]
auth_token_path: Option<Utf8PathBuf>,
}
// Like PathBufValueParser, but allows empty string.
@@ -344,24 +341,14 @@ async fn main() -> anyhow::Result<()> {
};
// Load JWT auth token to connect to other safekeepers for pull_timeline.
// First check if the env var is present, then check the arg with the path.
// We want to deprecate and remove the env var method in the future.
let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
Ok(v) => {
info!("loaded JWT token for authentication with safekeepers");
Some(SecretString::from(v))
}
Err(VarError::NotPresent) => {
if let Some(auth_token_path) = args.auth_token_path.as_ref() {
info!(
"loading JWT token for authentication with safekeepers from {auth_token_path}"
);
let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
Some(SecretString::from(auth_token.trim().to_owned()))
} else {
info!("no JWT token for authentication with safekeepers detected");
None
}
info!("no JWT token for authentication with safekeepers detected");
None
}
Err(_) => {
warn!("JWT token for authentication with safekeepers is not unicode");

View File

@@ -899,7 +899,7 @@ async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiErr
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let node_status = state.service.get_node(node_id).await?.describe();
let node_status = state.service.get_node(node_id).await?;
json_response(StatusCode::OK, node_status)
}

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
from collections import defaultdict
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING
from prometheus_client.parser import text_string_to_metric_families
@@ -46,26 +46,14 @@ class MetricsGetter:
def get_metrics(self) -> Metrics:
raise NotImplementedError()
def get_metric_value(
self,
name: str,
filter: dict[str, str] | None = None,
aggregate: Literal["sum"] | None = None,
) -> float | None:
def get_metric_value(self, name: str, filter: dict[str, str] | None = None) -> float | None:
metrics = self.get_metrics()
results = metrics.query_all(name, filter=filter)
if not results:
log.info(f'could not find metric "{name}"')
return None
if aggregate is None:
assert len(results) == 1, (
f"metric {name} with given filters is not unique, got: {results}"
)
return results[0].value
elif aggregate == "sum":
return sum(sample.value for sample in results)
else:
raise RuntimeError(f"unknown aggregate function {aggregate}")
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
return results[0].value
def get_metrics_values(
self, names: list[str], filter: dict[str, str] | None = None, absence_ok: bool = False
@@ -144,7 +132,7 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
*histogram("pageserver_smgr_query_seconds_global"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_timeline_client_seconds_global"),
*histogram("pageserver_remote_operation_seconds"),
*histogram("pageserver_io_operations_seconds"),
"pageserver_smgr_query_started_global_count_total",
"pageserver_tenant_states_count",
@@ -155,7 +143,6 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
counter("pageserver_tenant_throttling_wait_usecs_sum_global"),
counter("pageserver_tenant_throttling_count_global"),
*histogram("pageserver_tokio_epoll_uring_slots_submission_queue_depth"),
*histogram("pageserver_wait_ondemand_download_seconds_global"),
)
PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
@@ -193,7 +180,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
counter("pageserver_wait_lsn_in_progress_micros"),
counter("pageserver_wait_lsn_started_count"),
counter("pageserver_wait_lsn_finished_count"),
counter("pageserver_wait_ondemand_download_seconds_sum"),
*histogram("pageserver_page_service_batch_size"),
*histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,

View File

@@ -1297,20 +1297,9 @@ class NeonEnv:
ps_cfg[key] = value
# Create a corresponding NeonPageserver object
ps = NeonPageserver(
self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]
self.pageservers.append(
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
)
if config.test_may_use_compatibility_snapshot_binaries:
# New features gated by pageserver config usually get rolled out in the
# test suite first, by enabling it in the `ps_cfg` abve.
# Compatibility tests run with old binaries that predate feature code & config.
# So, old binaries will warn about the flag's presence.
# Silence those warnings categorically.
log.info("test may use old binaries, ignoring warnings about unknown config items")
ps.allowed_errors.append(".*ignoring unknown configuration item.*")
self.pageservers.append(ps)
cfg["pageservers"].append(ps_cfg)
# Create config and a Safekeeper object for each safekeeper

View File

@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
version = 3
[[package]]
name = "addr2line"
@@ -421,9 +421,9 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "openssl"
version = "0.10.72"
version = "0.10.70"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da"
checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6"
dependencies = [
"bitflags 2.6.0",
"cfg-if",
@@ -453,9 +453,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.107"
version = "0.9.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8288979acd84749c744a9014b4382d42b8f7b2592847b5afb2ed29e5d16ede07"
checksum = "8b22d5b84be05a8d6947c7cb71f7c849aa0f112acd4bf51c2a7c1c988ac0a9dc"
dependencies = [
"cc",
"libc",

View File

@@ -101,7 +101,7 @@ if TYPE_CHECKING:
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
# export COMPATIBILITY_POSTGRES_DISTRIB_DIR=neon_previous/pg_install
# export NEON_BIN=target/${BUILD_TYPE}
# export NEON_BIN=target/release
# export POSTGRES_DISTRIB_DIR=pg_install
#
# # Build previous version of binaries and store them somewhere:

View File

@@ -126,7 +126,7 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder):
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_timeline_client_seconds_global_count",
name="pageserver_remote_operation_seconds_count",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),

View File

@@ -38,13 +38,12 @@ def get_num_downloaded_layers(client: PageserverHttpClient):
This assumes that the pageserver only has a single tenant.
"""
value = client.get_metric_value(
"pageserver_remote_timeline_client_seconds_global_count",
"pageserver_remote_operation_seconds_count",
{
"file_kind": "layer",
"op_kind": "download",
"status": "success",
},
"sum",
)
if value is None:
return 0

View File

@@ -1,56 +0,0 @@
import re
import pytest
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import run_only_on_default_postgres
@pytest.mark.parametrize("what", ["default", "top_level", "nested"])
@run_only_on_default_postgres(reason="does not use postgres")
def test_unknown_config_items_handling(neon_simple_env: NeonEnv, what: str):
"""
Ensure we log unknown config fields and expose a metric for alerting.
There are more unit tests in the Rust code for other TOML items.
"""
env = neon_simple_env
def edit_fn(config) -> str | None:
if what == "default":
return None
elif what == "top_level":
config["unknown_top_level_config_item"] = 23
return r"unknown_top_level_config_item"
elif what == "nested":
config["remote_storage"]["unknown_config_item"] = 23
return r"remote_storage.unknown_config_item"
else:
raise ValueError(f"Unknown what: {what}")
def get_metric():
metrics = env.pageserver.http_client().get_metrics()
samples = metrics.query_all("pageserver_config_ignored_items")
by_item = {sample.labels["item"]: sample.value for sample in samples}
assert by_item[""] == 0, "must always contain the empty item with value 0"
del by_item[""]
return by_item
expected_ignored_item = env.pageserver.edit_config_toml(edit_fn)
if expected_ignored_item is not None:
expected_ignored_item_log_line_re = r".*ignoring unknown configuration item.*" + re.escape(
expected_ignored_item
)
env.pageserver.allowed_errors.append(expected_ignored_item_log_line_re)
if expected_ignored_item is not None:
assert not env.pageserver.log_contains(expected_ignored_item_log_line_re)
assert get_metric() == {}
# in any way, unknown config items should not fail pageserver to start
# TODO: extend this test with the config validator mode once we introduce it
# https://github.com/neondatabase/cloud/issues/24349
env.pageserver.restart()
if expected_ignored_item is not None:
assert env.pageserver.log_contains(expected_ignored_item_log_line_re)
assert get_metric() == {expected_ignored_item: 1}

View File

@@ -195,7 +195,3 @@ def test_throttle_fair_config_is_settable_but_ignored_in_config_toml(
ps_http = env.pageserver.http_client()
conf = ps_http.tenant_config(env.initial_tenant)
assert_throttle_config_with_field_fair_set(conf.effective_config["timeline_get_throttle"])
env.pageserver.allowed_errors.append(
r'.*ignoring unknown configuration item path="tenant_config\.timeline_get_throttle\.fair"*'
)

View File

@@ -107,7 +107,7 @@ def test_metric_collection(
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_timeline_client_seconds_global_count",
name="pageserver_remote_operation_seconds_count",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),

View File

@@ -4109,7 +4109,6 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
env.storage_controller.allowed_errors.extend(
[
".*Call to safekeeper.* management API still failed after.*",
".*Call to safekeeper.* management API failed, will retry.*",
".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
]
)

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.4",
"66114c23bc61205b0e3fb1e77ee76a4abc1eb4b8"
"c9e4ff5a38907acd71107634055bf2609aba43a5"
],
"v16": [
"16.8",
"d56e79cd5d6136c159b1d8d98acb7981d4b69364"
"746bd9ffe5c29bce030eaea1031054057f3c5d45"
],
"v15": [
"15.12",
"aeb292eeace9072e07071254b6ffc7a74007d4d2"
"23708b3aca9adf163aa0973eb63d9afc0e4a04c3"
],
"v14": [
"14.17",
"a0391901a2af13aa029b905272a5b2024133c926"
"8cca70c22e2894dd4645f9a940086ac437b0a11b"
]
}