Compare commits

..

2 Commits

Author SHA1 Message Date
Folke Behrens
357795e0c5 VPC flow logs IaC 2025-04-06 13:14:40 +02:00
Folke Behrens
f105ddb778 aws lambda function for periodic collection of pod infomation 2025-04-06 13:14:40 +02:00
155 changed files with 5653 additions and 6148 deletions

View File

@@ -19,7 +19,6 @@
!pageserver/
!pgxn/
!proxy/
!object_storage/
!storage_scrubber/
!safekeeper/
!storage_broker/

View File

@@ -2,9 +2,6 @@ import json
import os
import subprocess
RED = "\033[91m"
RESET = "\033[0m"
image_map = os.getenv("IMAGE_MAP")
if not image_map:
raise ValueError("IMAGE_MAP environment variable is not set")
@@ -32,14 +29,9 @@ while len(pending) > 0:
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
failures.append((" ".join(cmd), result.stdout, target))
failures.append((" ".join(cmd), result.stdout))
pending.append((source, target))
print(
f"{RED}[RETRY]{RESET} Push failed for {target}. Retrying... (failure count: {len(failures)})"
)
print(result.stdout)
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
failed_targets = [target for _, _, target in failures]
with open(github_output, "a") as f:
f.write(f"push_failures={json.dumps(failed_targets)}\n")
f.write("slack_notify=true\n")

View File

@@ -110,19 +110,12 @@ jobs:
IMAGE_MAP: ${{ inputs.image-map }}
- name: Notify Slack if container image pushing fails
if: steps.push.outputs.push_failures || failure()
if: steps.push.outputs.slack_notify == 'true' || failure()
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
text: >
*Container image pushing ${{
steps.push.outcome == 'failure' && 'failed completely' || 'succeeded with some retries'
}}* in
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
${{ steps.push.outputs.push_failures && format(
'*Failed targets:*\n• {0}', join(fromJson(steps.push.outputs.push_failures), '\n• ')
) || '' }}
text: |
Pushing container images failed in <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>

1
.gitignore vendored
View File

@@ -1,4 +1,3 @@
/artifact_cache
/pg_install
/target
/tmp_check

1133
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -40,7 +40,8 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"object_storage",
"lambda/aztraffic",
"lambda/pod_info_dumper",
]
[workspace.package]
@@ -184,7 +185,7 @@ test-context = "0.3"
thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio = { version = "1.41", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
@@ -209,7 +210,6 @@ tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
twox-hash = { version = "1.6.3", default-features = false }
typed-json = "0.1"
url = "2.2"
@@ -344,3 +344,12 @@ inherits = "release"
debug = false # true = 2 = all symbols, 1 = line only
opt-level = "z"
lto = true
[profile.release-lambda-function]
inherits = "release"
lto = true
opt-level = "z"
codegen-units = 1
panic = "abort"
debug = false
strip = true

View File

@@ -89,7 +89,6 @@ RUN set -e \
--bin storage_broker \
--bin storage_controller \
--bin proxy \
--bin object_storage \
--bin neon_local \
--bin storage_scrubber \
--locked --release
@@ -122,7 +121,6 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin

View File

@@ -1022,6 +1022,39 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
#########################################################################################
#
# Layer "pg_embedding-build"
# compile pg_embedding extension
#
#########################################################################################
FROM build-deps AS pg_embedding-src
ARG PG_VERSION
# This is our extension, support stopped in favor of pgvector
# TODO: deprecate it
WORKDIR /ext-src
RUN case "${PG_VERSION:?}" in \
"v14" | "v15") \
export PG_EMBEDDING_VERSION=0.3.5 \
export PG_EMBEDDING_CHECKSUM=0e95b27b8b6196e2cf0a0c9ec143fe2219b82e54c5bb4ee064e76398cbe69ae9 \
;; \
*) \
echo "pg_embedding not supported on this PostgreSQL version. Use pgvector instead." && exit 0;; \
esac && \
wget https://github.com/neondatabase/pg_embedding/archive/refs/tags/${PG_EMBEDDING_VERSION}.tar.gz -O pg_embedding.tar.gz && \
echo "${PG_EMBEDDING_CHECKSUM} pg_embedding.tar.gz" | sha256sum --check && \
mkdir pg_embedding-src && cd pg_embedding-src && tar xzf ../pg_embedding.tar.gz --strip-components=1 -C .
FROM pg-build AS pg_embedding-build
COPY --from=pg_embedding-src /ext-src/ /ext-src/
WORKDIR /ext-src/
RUN if [ -d pg_embedding-src ]; then \
cd pg_embedding-src && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install; \
fi
#########################################################################################
#
# Layer "pg build with nonroot user and cargo installed"
@@ -1614,6 +1647,7 @@ COPY --from=rdkit-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_uuidv7-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_embedding-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -1790,6 +1824,7 @@ COPY --from=pg_cron-src /ext-src/ /ext-src/
COPY --from=pg_uuidv7-src /ext-src/ /ext-src/
COPY --from=pg_roaringbitmap-src /ext-src/ /ext-src/
COPY --from=pg_semver-src /ext-src/ /ext-src/
#COPY --from=pg_embedding-src /ext-src/ /ext-src/
#COPY --from=wal2json-src /ext-src/ /ext-src/
COPY --from=pg_ivm-src /ext-src/ /ext-src/
COPY --from=pg_partman-src /ext-src/ /ext-src/

View File

@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
ERROR: must be owner of relation constraint_comments_tbl
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
index d785f92561..16377e5ac9 100644
index 442e7aff2b..525f732b03 100644
--- a/src/test/regress/expected/conversion.out
+++ b/src/test/regress/expected/conversion.out
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
@@ -8,7 +8,7 @@
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -587,15 +587,16 @@ index f551624afb..57f1e432d4 100644
SELECT *
INTO TABLE ramp
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
index 4cbdbdf84d..573362850e 100644
index 454db91ec0..01378d7081 100644
--- a/src/test/regress/expected/database.out
+++ b/src/test/regress/expected/database.out
@@ -1,8 +1,6 @@
@@ -1,8 +1,7 @@
CREATE DATABASE regression_tbd
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
+WARNING: you need to manually restart any running background workers after this command
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
BEGIN;
@@ -699,7 +700,7 @@ index 6ed50fdcfa..caa00a345d 100644
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 84745b9f60..4883c12351 100644
index 6b8c2f2414..8e13b7fa46 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -1111,7 +1112,7 @@ index 8475231735..0653946337 100644
DROP ROLE regress_passwd_sha_len1;
DROP ROLE regress_passwd_sha_len2;
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
index 620fbe8c52..0570102357 100644
index 5b9dba7b32..cc408dad42 100644
--- a/src/test/regress/expected/privileges.out
+++ b/src/test/regress/expected/privileges.out
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
@@ -1173,8 +1174,8 @@ index 620fbe8c52..0570102357 100644
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
SET SESSION AUTHORIZATION regress_priv_user3;
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
SET SESSION AUTHORIZATION regress_priv_user1;
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
ERROR: permission denied to grant privileges as role "regress_priv_role"
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
@@ -1191,7 +1192,7 @@ index 620fbe8c52..0570102357 100644
DROP ROLE regress_priv_role;
SET SESSION AUTHORIZATION regress_priv_user1;
SELECT session_user, current_user;
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -1200,7 +1201,7 @@ index 620fbe8c52..0570102357 100644
-- Check that index expressions and predicates are run as the table's owner
-- A dummy index function checking current_user
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
drop cascades to function testns.priv_testproc(integer)
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -1211,7 +1212,7 @@ index 620fbe8c52..0570102357 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
DROP USER regress_priv_user8; -- does not exist
ERROR: role "regress_priv_user8" does not exist
-- permissions with LOCK TABLE
@@ -1220,7 +1221,7 @@ index 620fbe8c52..0570102357 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
GRANT SELECT ON lock_table TO regress_locktable_user;
@@ -2881,7 +2885,7 @@ DROP USER regress_locktable_user;
@@ -2874,7 +2878,7 @@ DROP USER regress_locktable_user;
-- pg_backend_memory_contexts.
-- switch to superuser
\c -
@@ -1229,7 +1230,7 @@ index 620fbe8c52..0570102357 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
has_table_privilege
---------------------
@@ -2925,10 +2929,10 @@ RESET ROLE;
@@ -2918,10 +2922,10 @@ RESET ROLE;
-- clean up
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -1244,7 +1245,7 @@ index 620fbe8c52..0570102357 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
SET SESSION AUTHORIZATION regress_group_direct_manager;
@@ -2957,9 +2961,9 @@ DROP ROLE regress_group_direct_manager;
@@ -2950,9 +2954,9 @@ DROP ROLE regress_group_direct_manager;
DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -1840,7 +1841,7 @@ index 09a255649b..15895f0c53 100644
CREATE TABLE ruletest_t2 (x int);
CREATE VIEW ruletest_v1 WITH (security_invoker=true) AS
diff --git a/src/test/regress/expected/security_label.out b/src/test/regress/expected/security_label.out
index a8e01a6220..83543b250a 100644
index a8e01a6220..5a9cef4ede 100644
--- a/src/test/regress/expected/security_label.out
+++ b/src/test/regress/expected/security_label.out
@@ -6,8 +6,8 @@ SET client_min_messages TO 'warning';
@@ -1854,6 +1855,34 @@ index a8e01a6220..83543b250a 100644
CREATE TABLE seclabel_tbl1 (a int, b text);
CREATE TABLE seclabel_tbl2 (x int, y text);
CREATE VIEW seclabel_view1 AS SELECT * FROM seclabel_tbl2;
@@ -19,21 +19,21 @@ ALTER TABLE seclabel_tbl2 OWNER TO regress_seclabel_user2;
-- Test of SECURITY LABEL statement without a plugin
--
SECURITY LABEL ON TABLE seclabel_tbl1 IS 'classified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL FOR 'dummy' ON TABLE seclabel_tbl1 IS 'classified'; -- fail
ERROR: security label provider "dummy" is not loaded
SECURITY LABEL ON TABLE seclabel_tbl1 IS '...invalid label...'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON TABLE seclabel_tbl3 IS 'unclassified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL FOR 'dummy' ON ROLE regress_seclabel_user1 IS 'classified'; -- fail
ERROR: security label provider "dummy" is not loaded
SECURITY LABEL ON ROLE regress_seclabel_user1 IS '...invalid label...'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
SECURITY LABEL ON ROLE regress_seclabel_user3 IS 'unclassified'; -- fail
-ERROR: no security label providers have been loaded
+ERROR: must specify provider when multiple security label providers have been loaded
-- clean up objects
DROP FUNCTION seclabel_four();
DROP DOMAIN seclabel_domain;
diff --git a/src/test/regress/expected/select_into.out b/src/test/regress/expected/select_into.out
index b79fe9a1c0..e29fab88ab 100644
--- a/src/test/regress/expected/select_into.out
@@ -2384,10 +2413,10 @@ index e3e3bea709..fa86ddc326 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
index b567a1a572..4d1ac2e631 100644
index 9a65fca91f..58431a3056 100644
--- a/src/test/regress/sql/conversion.sql
+++ b/src/test/regress/sql/conversion.sql
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -2751,7 +2780,7 @@ index ae6841308b..47bc792e30 100644
SELECT *
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
index 46ad263478..eb05584ed5 100644
index 0367c0e37a..a23b98c4bd 100644
--- a/src/test/regress/sql/database.sql
+++ b/src/test/regress/sql/database.sql
@@ -1,8 +1,6 @@
@@ -2864,7 +2893,7 @@ index aa147b14a9..370e0dd570 100644
CREATE FOREIGN DATA WRAPPER dummy;
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 9f4210b26e..620d3fc87e 100644
index 45c7a534cb..32dd26b8cd 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -3217,7 +3246,7 @@ index 53e86b0b6c..0303fdfe96 100644
-- Check that the invalid secrets were re-hashed. A re-hashed secret
-- should not contain the original salt.
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
index 259f1aedd1..6e1a3d17b7 100644
index 249df17a58..b258e7f26a 100644
--- a/src/test/regress/sql/privileges.sql
+++ b/src/test/regress/sql/privileges.sql
@@ -24,18 +24,18 @@ RESET client_min_messages;
@@ -3279,7 +3308,7 @@ index 259f1aedd1..6e1a3d17b7 100644
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -3288,7 +3317,7 @@ index 259f1aedd1..6e1a3d17b7 100644
-- Check that index expressions and predicates are run as the table's owner
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -3299,7 +3328,7 @@ index 259f1aedd1..6e1a3d17b7 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
-- permissions with LOCK TABLE
@@ -3308,7 +3337,7 @@ index 259f1aedd1..6e1a3d17b7 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
@@ -1839,7 +1839,7 @@ DROP USER regress_locktable_user;
@@ -1836,7 +1836,7 @@ DROP USER regress_locktable_user;
-- switch to superuser
\c -
@@ -3317,7 +3346,7 @@ index 259f1aedd1..6e1a3d17b7 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
@@ -1859,10 +1859,10 @@ RESET ROLE;
@@ -1856,10 +1856,10 @@ RESET ROLE;
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -3332,7 +3361,7 @@ index 259f1aedd1..6e1a3d17b7 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
@@ -1884,9 +1884,9 @@ DROP ROLE regress_group_indirect_manager;
@@ -1881,9 +1881,9 @@ DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes

View File

@@ -202,10 +202,10 @@ index cf0b80d616..e8e2a14a4a 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
ERROR: must be owner of relation constraint_comments_tbl
diff --git a/src/test/regress/expected/conversion.out b/src/test/regress/expected/conversion.out
index d785f92561..16377e5ac9 100644
index 442e7aff2b..525f732b03 100644
--- a/src/test/regress/expected/conversion.out
+++ b/src/test/regress/expected/conversion.out
@@ -15,7 +15,7 @@ SELECT FROM test_enc_setup();
@@ -8,7 +8,7 @@
CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, result OUT bytea)
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -587,15 +587,16 @@ index f551624afb..57f1e432d4 100644
SELECT *
INTO TABLE ramp
diff --git a/src/test/regress/expected/database.out b/src/test/regress/expected/database.out
index 4cbdbdf84d..573362850e 100644
index 454db91ec0..01378d7081 100644
--- a/src/test/regress/expected/database.out
+++ b/src/test/regress/expected/database.out
@@ -1,8 +1,6 @@
@@ -1,8 +1,7 @@
CREATE DATABASE regression_tbd
ENCODING utf8 LC_COLLATE "C" LC_CTYPE "C" TEMPLATE template0;
ALTER DATABASE regression_tbd RENAME TO regression_utf8;
-ALTER DATABASE regression_utf8 SET TABLESPACE regress_tblspace;
-ALTER DATABASE regression_utf8 RESET TABLESPACE;
+WARNING: you need to manually restart any running background workers after this command
ALTER DATABASE regression_utf8 CONNECTION_LIMIT 123;
-- Test PgDatabaseToastTable. Doing this with GRANT would be slow.
BEGIN;
@@ -699,7 +700,7 @@ index 6ed50fdcfa..caa00a345d 100644
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index fe6a1015f2..614b387b7d 100644
index 69994c98e3..129abcfbe8 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -1146,7 +1147,7 @@ index 924d6e001d..7fdda73439 100644
DROP ROLE regress_passwd_sha_len1;
DROP ROLE regress_passwd_sha_len2;
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
index e8c668e0a1..03be5c2120 100644
index 1296da0d57..f43fffa44c 100644
--- a/src/test/regress/expected/privileges.out
+++ b/src/test/regress/expected/privileges.out
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
@@ -1208,8 +1209,8 @@ index e8c668e0a1..03be5c2120 100644
+CREATE GROUP regress_priv_group2 WITH ADMIN regress_priv_user1 PASSWORD NEON_PASSWORD_PLACEHOLDER USER regress_priv_user2;
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
SET SESSION AUTHORIZATION regress_priv_user3;
@@ -246,12 +246,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
SET SESSION AUTHORIZATION regress_priv_user1;
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
ERROR: permission denied to grant privileges as role "regress_priv_role"
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
@@ -1226,7 +1227,7 @@ index e8c668e0a1..03be5c2120 100644
DROP ROLE regress_priv_role;
SET SESSION AUTHORIZATION regress_priv_user1;
SELECT session_user, current_user;
@@ -1783,7 +1787,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -1235,7 +1236,7 @@ index e8c668e0a1..03be5c2120 100644
-- Check that index expressions and predicates are run as the table's owner
-- A dummy index function checking current_user
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
@@ -2675,8 +2679,8 @@ drop cascades to function testns.priv_testagg(integer)
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
drop cascades to function testns.priv_testproc(integer)
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -1246,7 +1247,7 @@ index e8c668e0a1..03be5c2120 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
@@ -2799,7 +2803,7 @@ DROP USER regress_priv_user7;
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
DROP USER regress_priv_user8; -- does not exist
ERROR: role "regress_priv_user8" does not exist
-- permissions with LOCK TABLE
@@ -1255,7 +1256,7 @@ index e8c668e0a1..03be5c2120 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
GRANT SELECT ON lock_table TO regress_locktable_user;
@@ -2895,7 +2899,7 @@ DROP USER regress_locktable_user;
@@ -2888,7 +2892,7 @@ DROP USER regress_locktable_user;
-- pg_backend_memory_contexts.
-- switch to superuser
\c -
@@ -1264,7 +1265,7 @@ index e8c668e0a1..03be5c2120 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
has_table_privilege
---------------------
@@ -2939,10 +2943,10 @@ RESET ROLE;
@@ -2932,10 +2936,10 @@ RESET ROLE;
-- clean up
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -1279,7 +1280,7 @@ index e8c668e0a1..03be5c2120 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
SET SESSION AUTHORIZATION regress_group_direct_manager;
@@ -2971,9 +2975,9 @@ DROP ROLE regress_group_direct_manager;
@@ -2964,9 +2968,9 @@ DROP ROLE regress_group_direct_manager;
DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -1292,7 +1293,7 @@ index e8c668e0a1..03be5c2120 100644
CREATE SCHEMA regress_roleoption;
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
@@ -3002,9 +3006,9 @@ DROP ROLE regress_roleoption_protagonist;
@@ -2995,9 +2999,9 @@ DROP ROLE regress_roleoption_protagonist;
DROP ROLE regress_roleoption_donor;
DROP ROLE regress_roleoption_recipient;
-- MAINTAIN
@@ -2432,10 +2433,10 @@ index e3e3bea709..fa86ddc326 100644
COMMENT ON CONSTRAINT the_constraint ON constraint_comments_tbl IS 'no, the comment';
COMMENT ON CONSTRAINT the_constraint ON DOMAIN constraint_comments_dom IS 'no, another comment';
diff --git a/src/test/regress/sql/conversion.sql b/src/test/regress/sql/conversion.sql
index b567a1a572..4d1ac2e631 100644
index 9a65fca91f..58431a3056 100644
--- a/src/test/regress/sql/conversion.sql
+++ b/src/test/regress/sql/conversion.sql
@@ -17,7 +17,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
@@ -12,7 +12,7 @@ CREATE FUNCTION test_enc_conversion(bytea, name, name, bool, validlen OUT int, r
AS :'regresslib', 'test_enc_conversion'
LANGUAGE C STRICT;
@@ -2799,7 +2800,7 @@ index ae6841308b..47bc792e30 100644
SELECT *
diff --git a/src/test/regress/sql/database.sql b/src/test/regress/sql/database.sql
index 46ad263478..eb05584ed5 100644
index 0367c0e37a..a23b98c4bd 100644
--- a/src/test/regress/sql/database.sql
+++ b/src/test/regress/sql/database.sql
@@ -1,8 +1,6 @@
@@ -2912,7 +2913,7 @@ index aa147b14a9..370e0dd570 100644
CREATE FOREIGN DATA WRAPPER dummy;
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 8c4e4c7c83..e946cd2119 100644
index 2e710e419c..89cd481a54 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
@@ -3300,7 +3301,7 @@ index bb82aa4aa2..dd8a05e24d 100644
-- Check that the invalid secrets were re-hashed. A re-hashed secret
-- should not contain the original salt.
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
index b7e1cb6cdd..6e5a2217f1 100644
index 5880bc018d..27aa952b18 100644
--- a/src/test/regress/sql/privileges.sql
+++ b/src/test/regress/sql/privileges.sql
@@ -24,18 +24,18 @@ RESET client_min_messages;
@@ -3362,7 +3363,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
@@ -1160,7 +1160,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
-- security-restricted operations
\c -
@@ -3371,7 +3372,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
-- Check that index expressions and predicates are run as the table's owner
@@ -1656,8 +1656,8 @@ DROP SCHEMA testns CASCADE;
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
-- Change owner of the schema & and rename of new schema owner
\c -
@@ -3382,7 +3383,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
SET SESSION ROLE regress_schemauser1;
CREATE SCHEMA testns;
@@ -1751,7 +1751,7 @@ DROP USER regress_priv_user8; -- does not exist
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
-- permissions with LOCK TABLE
@@ -3391,7 +3392,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
CREATE TABLE lock_table (a int);
-- LOCK TABLE and SELECT permission
@@ -1854,7 +1854,7 @@ DROP USER regress_locktable_user;
@@ -1851,7 +1851,7 @@ DROP USER regress_locktable_user;
-- switch to superuser
\c -
@@ -3400,7 +3401,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
@@ -1874,10 +1874,10 @@ RESET ROLE;
@@ -1871,10 +1871,10 @@ RESET ROLE;
DROP ROLE regress_readallstats;
-- test role grantor machinery
@@ -3415,7 +3416,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
@@ -1899,9 +1899,9 @@ DROP ROLE regress_group_indirect_manager;
@@ -1896,9 +1896,9 @@ DROP ROLE regress_group_indirect_manager;
DROP ROLE regress_group_member;
-- test SET and INHERIT options with object ownership changes
@@ -3428,7 +3429,7 @@ index b7e1cb6cdd..6e5a2217f1 100644
CREATE SCHEMA regress_roleoption;
GRANT CREATE, USAGE ON SCHEMA regress_roleoption TO PUBLIC;
GRANT regress_roleoption_donor TO regress_roleoption_protagonist WITH INHERIT TRUE, SET FALSE;
@@ -1929,9 +1929,9 @@ DROP ROLE regress_roleoption_donor;
@@ -1926,9 +1926,9 @@ DROP ROLE regress_roleoption_donor;
DROP ROLE regress_roleoption_recipient;
-- MAINTAIN

View File

@@ -118,18 +118,16 @@ struct Cli {
#[arg(long)]
pub set_disk_quota_for_fs: Option<String>,
#[arg(short = 's', long = "spec", group = "spec")]
pub spec_json: Option<String>,
#[arg(short = 'S', long, group = "spec-path")]
pub spec_path: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id")]
pub compute_id: String,
#[arg(
short = 'p',
long,
conflicts_with = "spec-path",
value_name = "CONTROL_PLANE_API_BASE_URL"
)]
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
pub control_plane_uri: Option<String>,
}
@@ -174,6 +172,7 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
live_config_allowed: cli_spec.live_config_allowed,
},
cli_spec.spec,
cli_spec.compute_ctl_config,
@@ -202,12 +201,23 @@ async fn init() -> Result<()> {
}
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
// First, read spec from the path if provided
// First, try to get cluster spec from the cli argument
if let Some(ref spec_json) = cli.spec_json {
info!("got spec from cli argument {}", spec_json);
return Ok(CliSpecParams {
spec: Some(serde_json::from_str(spec_json)?),
compute_ctl_config: ComputeCtlConfig::default(),
live_config_allowed: false,
});
}
// Second, try to read it from the file if path is provided
if let Some(ref spec_path) = cli.spec_path {
let file = File::open(Path::new(spec_path))?;
return Ok(CliSpecParams {
spec: Some(serde_json::from_reader(file)?),
compute_ctl_config: ComputeCtlConfig::default(),
live_config_allowed: true,
});
}
@@ -215,12 +225,11 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
panic!("must specify --control-plane-uri");
};
// If the spec wasn't provided in the CLI arguments, then retrieve it from
// the control plane
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
Ok(resp) => Ok(CliSpecParams {
spec: resp.0,
compute_ctl_config: resp.1,
live_config_allowed: true,
}),
Err(e) => {
error!(
@@ -238,6 +247,7 @@ struct CliSpecParams {
spec: Option<ComputeSpec>,
#[allow(dead_code)]
compute_ctl_config: ComputeCtlConfig,
live_config_allowed: bool,
}
fn deinit_and_exit(exit_code: Option<i32>) -> ! {

View File

@@ -98,15 +98,13 @@ pub async fn get_database_schema(
.kill_on_drop(true)
.spawn()?;
let stdout = cmd
.stdout
.take()
.ok_or_else(|| std::io::Error::other("Failed to capture stdout."))?;
let stdout = cmd.stdout.take().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to capture stdout.")
})?;
let stderr = cmd
.stderr
.take()
.ok_or_else(|| std::io::Error::other("Failed to capture stderr."))?;
let stderr = cmd.stderr.take().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to capture stderr.")
})?;
let mut stdout_reader = FramedRead::new(stdout, BytesCodec::new());
let stderr_reader = BufReader::new(stderr);
@@ -130,7 +128,8 @@ pub async fn get_database_schema(
}
});
return Err(SchemaDumpError::IO(std::io::Error::other(
return Err(SchemaDumpError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to start pg_dump",
)));
}

View File

@@ -93,6 +93,20 @@ pub struct ComputeNodeParams {
/// the address of extension storage proxy gateway
pub ext_remote_storage: Option<String>,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
/// - we start compute with some spec provided as argument
/// - we push new spec and it does reconfiguration
/// - but then something happens and compute pod / VM is destroyed,
/// so k8s controller starts it again with the **old** spec
///
/// and the same for empty computes:
/// - we started compute without any spec
/// - we push spec and it does configuration
/// - but then it is restarted without any spec again
pub live_config_allowed: bool,
}
/// Compute node info shared across several `compute_ctl` threads.
@@ -647,8 +661,15 @@ impl ComputeNode {
}
// Configure and start rsyslog for Postgres logs export
let conf = PostgresLogsRsyslogConfig::new(pspec.spec.logs_export_host.as_deref());
configure_postgres_logs_export(conf)?;
if self.has_feature(ComputeFeature::PostgresLogsExport) {
if let Some(ref project_id) = pspec.spec.cluster.cluster_id {
let host = PostgresLogsRsyslogConfig::default_host(project_id);
let conf = PostgresLogsRsyslogConfig::new(Some(&host));
configure_postgres_logs_export(conf)?;
} else {
warn!("not configuring rsyslog for Postgres logs export: project ID is missing")
}
}
// Launch remaining service threads
let _monitor_handle = launch_monitor(self);
@@ -1552,10 +1573,6 @@ impl ComputeNode {
});
}
// Reconfigure rsyslog for Postgres logs export
let conf = PostgresLogsRsyslogConfig::new(spec.logs_export_host.as_deref());
configure_postgres_logs_export(conf)?;
// Write new config
let pgdata_path = Path::new(&self.params.pgdata);
config::write_postgres_conf(

View File

@@ -7,7 +7,7 @@ use std::io::prelude::*;
use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use compute_api::spec::{ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, GenericOption};
use crate::pg_helpers::{
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
@@ -255,7 +255,7 @@ pub fn write_postgres_conf(
// We need Postgres to send logs to rsyslog so that we can forward them
// further to customers' log aggregation systems.
if spec.logs_export_host.is_some() {
if spec.features.contains(&ComputeFeature::PostgresLogsExport) {
writeln!(file, "log_destination='stderr,syslog'")?;
}

View File

@@ -6,15 +6,20 @@ use axum_extra::{
TypedHeader,
headers::{Authorization, authorization::Bearer},
};
use compute_api::requests::ComputeClaims;
use futures::future::BoxFuture;
use http::{Request, Response, StatusCode};
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
use serde::Deserialize;
use tower_http::auth::AsyncAuthorizeRequest;
use tracing::warn;
use crate::http::{JsonResponse, extract::RequestId};
#[derive(Clone, Debug, Deserialize)]
pub(in crate::http) struct Claims {
compute_id: String,
}
#[derive(Clone, Debug)]
pub(in crate::http) struct Authorize {
compute_id: String,
@@ -107,11 +112,7 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
impl Authorize {
/// Verify the token using the JSON Web Key set and return the token data.
fn verify(
jwks: &JwkSet,
token: &str,
validation: &Validation,
) -> Result<TokenData<ComputeClaims>> {
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
for jwk in jwks.keys.iter() {
let decoding_key = match DecodingKey::from_jwk(jwk) {
Ok(key) => key,
@@ -126,7 +127,7 @@ impl Authorize {
}
};
match jsonwebtoken::decode::<ComputeClaims>(token, &decoding_key, validation) {
match jsonwebtoken::decode::<Claims>(token, &decoding_key, validation) {
Ok(data) => return Ok(data),
Err(e) => {
warn!(

View File

@@ -306,6 +306,36 @@ paths:
schema:
$ref: "#/components/schemas/GenericError"
/configure_telemetry:
post:
tags:
- Configure
summary: Configure rsyslog
description: |
This API endpoint configures rsyslog to forward Postgres logs
to a specified otel collector.
operationId: configureTelemetry
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
logs_export_host:
type: string
description: |
Hostname and the port of the otel collector. Leave empty to disable logs forwarding.
Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:54526
responses:
204:
description: "Telemetry configured successfully"
500:
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:
JWT:

View File

@@ -1,9 +1,11 @@
use std::sync::Arc;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use compute_api::requests::ConfigurationRequest;
use compute_api::requests::{ConfigurationRequest, ConfigureTelemetryRequest};
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
use compute_api::spec::ComputeFeature;
use http::StatusCode;
use tokio::task;
use tracing::info;
@@ -11,6 +13,7 @@ use tracing::info;
use crate::compute::{ComputeNode, ParsedSpec};
use crate::http::JsonResponse;
use crate::http::extract::Json;
use crate::rsyslog::{PostgresLogsRsyslogConfig, configure_postgres_logs_export};
// Accept spec in JSON format and request compute configuration. If anything
// goes wrong after we set the compute status to `ConfigurationPending` and
@@ -22,6 +25,13 @@ pub(in crate::http) async fn configure(
State(compute): State<Arc<ComputeNode>>,
request: Json<ConfigurationRequest>,
) -> Response {
if !compute.params.live_config_allowed {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"live configuration is not allowed for this compute node".to_string(),
);
}
let pspec = match ParsedSpec::try_from(request.spec.clone()) {
Ok(p) => p,
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
@@ -85,3 +95,25 @@ pub(in crate::http) async fn configure(
JsonResponse::success(StatusCode::OK, body)
}
pub(in crate::http) async fn configure_telemetry(
State(compute): State<Arc<ComputeNode>>,
request: Json<ConfigureTelemetryRequest>,
) -> Response {
if !compute.has_feature(ComputeFeature::PostgresLogsExport) {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"Postgres logs export feature is not enabled".to_string(),
);
}
let conf = PostgresLogsRsyslogConfig::new(request.logs_export_host.as_deref());
if let Err(err) = configure_postgres_logs_export(conf) {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
}
Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::from(""))
.unwrap()
}

View File

@@ -87,6 +87,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/configure_telemetry", post(configure::configure_telemetry))
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route("/insights", get(insights::get_insights))

View File

@@ -119,9 +119,16 @@ impl<'a> PostgresLogsRsyslogConfig<'a> {
};
Ok(config_content)
}
/// Returns the default host for otel collector that receives Postgres logs
pub fn default_host(project_id: &str) -> String {
format!(
"config-{}-collector.neon-telemetry.svc.cluster.local:10514",
project_id
)
}
}
/// Writes rsyslogd configuration for Postgres logs export and restarts rsyslog.
pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
let new_config = conf.build()?;
let current_config = PostgresLogsRsyslogConfig::current_config()?;
@@ -254,5 +261,16 @@ mod tests {
let res = conf.build();
assert!(res.is_err());
}
{
// Verify config with default host
let host = PostgresLogsRsyslogConfig::default_host("shy-breeze-123");
let conf = PostgresLogsRsyslogConfig::new(Some(&host));
let res = conf.build();
assert!(res.is_ok());
let conf_str = res.unwrap();
assert!(conf_str.contains(r#"shy-breeze-123"#));
assert!(conf_str.contains(r#"port="10514""#));
}
}
}

View File

@@ -20,10 +20,8 @@ use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::{
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
ObjectStorageConf, SafekeeperConf,
SafekeeperConf,
};
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
use control_plane::object_storage::ObjectStorage;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
@@ -41,7 +39,7 @@ use pageserver_api::controller_api::{
use pageserver_api::models::{
ShardParameters, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
@@ -93,8 +91,6 @@ enum NeonLocalCmd {
#[command(subcommand)]
Safekeeper(SafekeeperCmd),
#[command(subcommand)]
ObjectStorage(ObjectStorageCmd),
#[command(subcommand)]
Endpoint(EndpointCmd),
#[command(subcommand)]
Mappings(MappingsCmd),
@@ -458,32 +454,6 @@ enum SafekeeperCmd {
Restart(SafekeeperRestartCmdArgs),
}
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum ObjectStorageCmd {
Start(ObjectStorageStartCmd),
Stop(ObjectStorageStopCmd),
}
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct ObjectStorageStartCmd {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct ObjectStorageStopCmd {
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
stop_mode: StopMode,
}
#[derive(clap::Args)]
#[clap(about = "Start local safekeeper")]
struct SafekeeperStartCmdArgs {
@@ -789,7 +759,6 @@ fn main() -> Result<()> {
}
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
@@ -1006,9 +975,6 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
}
})
.collect(),
object_storage: ObjectStorageConf {
port: OBJECT_STORAGE_DEFAULT_PORT,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
@@ -1117,7 +1083,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
stripe_size: args
.shard_stripe_size
.map(ShardStripeSize)
.unwrap_or(DEFAULT_STRIPE_SIZE),
.unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
},
placement_policy: args.placement_policy.clone(),
config: tenant_conf,
@@ -1430,7 +1396,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
vec![(parsed.0, parsed.1.unwrap_or(5432))],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by storage controller, therefore not sharded.
DEFAULT_STRIPE_SIZE,
ShardParameters::DEFAULT_STRIPE_SIZE,
)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
@@ -1717,41 +1683,6 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
use ObjectStorageCmd::*;
let storage = ObjectStorage::from_env(env);
// In tests like test_forward_compatibility or test_graceful_cluster_restart
// old neon binaries (without object_storage) are present
if !storage.bin.exists() {
eprintln!(
"{} binary not found. Ignore if this is a compatibility test",
storage.bin
);
return Ok(());
}
match subcmd {
Start(ObjectStorageStartCmd { start_timeout }) => {
if let Err(e) = storage.start(start_timeout).await {
eprintln!("object_storage start failed: {e}");
exit(1);
}
}
Stop(ObjectStorageStopCmd { stop_mode }) => {
let immediate = match stop_mode {
StopMode::Fast => false,
StopMode::Immediate => true,
};
if let Err(e) = storage.stop(immediate) {
eprintln!("proxy stop failed: {e}");
exit(1);
}
}
};
Ok(())
}
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
match subcmd {
StorageBrokerCmd::Start(args) => {
@@ -1846,13 +1777,6 @@ async fn handle_start_all_impl(
.map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
});
}
js.spawn(async move {
ObjectStorage::from_env(env)
.start(&retry_timeout)
.await
.map_err(|e| e.context("start object_storage"))
});
})();
let mut errors = Vec::new();
@@ -1950,11 +1874,6 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage = ObjectStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("object_storage stop failed: {:#}", e);
}
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver.stop(immediate) {

View File

@@ -670,7 +670,6 @@ impl Endpoint {
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,
};
// this strange code is needed to support respec() in tests

View File

@@ -10,7 +10,6 @@ mod background_process;
pub mod broker;
pub mod endpoint;
pub mod local_env;
pub mod object_storage;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;

View File

@@ -15,10 +15,9 @@ use clap::ValueEnum;
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::auth::{Claims, encode_from_key_file};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -56,7 +55,6 @@ pub struct LocalEnv {
// used to issue tokens during e.g pg start
pub private_key_path: PathBuf,
pub public_key_path: PathBuf,
pub broker: NeonBroker,
@@ -70,8 +68,6 @@ pub struct LocalEnv {
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
pub control_plane_api: Url,
@@ -99,7 +95,6 @@ pub struct OnDiskConfig {
pub neon_distrib_dir: PathBuf,
pub default_tenant_id: Option<TenantId>,
pub private_key_path: PathBuf,
pub public_key_path: PathBuf,
pub broker: NeonBroker,
pub storage_controller: NeonStorageControllerConf,
#[serde(
@@ -108,7 +103,6 @@ pub struct OnDiskConfig {
)]
pub pageservers: Vec<PageServerConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
@@ -142,18 +136,11 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
}
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct ObjectStorageConf {
pub port: u16,
}
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
@@ -411,10 +398,6 @@ impl LocalEnv {
self.pg_dir(pg_version, "lib")
}
pub fn object_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("object_storage")
}
pub fn pageserver_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("pageserver")
}
@@ -448,10 +431,6 @@ impl LocalEnv {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn object_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("object_storage")
}
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
Ok(conf)
@@ -603,7 +582,6 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id,
private_key_path,
public_key_path,
broker,
storage_controller,
pageservers,
@@ -613,7 +591,6 @@ impl LocalEnv {
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
object_storage,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
@@ -621,7 +598,6 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id,
private_key_path,
public_key_path,
broker,
storage_controller,
pageservers,
@@ -630,7 +606,6 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
object_storage,
}
};
@@ -730,7 +705,6 @@ impl LocalEnv {
neon_distrib_dir: self.neon_distrib_dir.clone(),
default_tenant_id: self.default_tenant_id,
private_key_path: self.private_key_path.clone(),
public_key_path: self.public_key_path.clone(),
broker: self.broker.clone(),
storage_controller: self.storage_controller.clone(),
pageservers: vec![], // it's skip_serializing anyway
@@ -740,7 +714,6 @@ impl LocalEnv {
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
object_storage: self.object_storage.clone(),
},
)
}
@@ -757,7 +730,7 @@ impl LocalEnv {
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn generate_auth_token<S: Serialize>(&self, claims: &S) -> anyhow::Result<String> {
pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
let private_key_path = self.get_private_key_path();
let key_data = fs::read(private_key_path)?;
encode_from_key_file(claims, &key_data)
@@ -824,7 +797,6 @@ impl LocalEnv {
control_plane_api,
generate_local_ssl_certs,
control_plane_hooks_api,
object_storage,
} = conf;
// Find postgres binaries.
@@ -856,7 +828,6 @@ impl LocalEnv {
)
.context("generate auth keys")?;
let private_key_path = PathBuf::from("auth_private_key.pem");
let public_key_path = PathBuf::from("auth_public_key.pem");
// create the runtime type because the remaining initialization code below needs
// a LocalEnv instance op operation
@@ -867,7 +838,6 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id: Some(default_tenant_id),
private_key_path,
public_key_path,
broker,
storage_controller: storage_controller.unwrap_or_default(),
pageservers: pageservers.iter().map(Into::into).collect(),
@@ -876,7 +846,6 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
object_storage,
};
if generate_local_ssl_certs {
@@ -904,13 +873,8 @@ impl LocalEnv {
.context("pageserver init failed")?;
}
ObjectStorage::from_env(&env)
.init()
.context("object storage init failed")?;
// setup remote remote location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
env.persist_config()
}

View File

@@ -1,107 +0,0 @@
use crate::background_process::{self, start_process, stop_process};
use crate::local_env::LocalEnv;
use anyhow::anyhow;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub struct ObjectStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
}
impl ObjectStorage {
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
ObjectStorage {
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.object_storage.port,
}
}
fn config_path(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.json")
}
fn listen_addr(&self) -> Utf8PathBuf {
format!("127.0.0.1:{}", self.port).into()
}
pub fn init(&self) -> Result<()> {
println!("Initializing object storage in {:?}", self.data_dir);
let parent = self.data_dir.parent().unwrap();
#[derive(serde::Serialize)]
struct Cfg {
listen: Utf8PathBuf,
pemfile: Utf8PathBuf,
local_path: Utf8PathBuf,
r#type: String,
}
let cfg = Cfg {
listen: self.listen_addr(),
pemfile: parent.join(self.pemfile.clone()),
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
r#type: "LocalFs".to_string(),
};
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
std::fs::write(self.config_path(), serde_json::to_string(&cfg)?)
.context("write object storage config")?;
Ok(())
}
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
println!("Starting s3 proxy at {}", self.listen_addr());
std::io::stdout().flush().context("flush stdout")?;
let process_status_check = || async {
tokio::time::sleep(Duration::from_millis(500)).await;
let res = reqwest::Client::new()
.get(format!("http://{}/metrics", self.listen_addr()))
.send()
.await;
match res {
Ok(response) if response.status().is_success() => Ok(true),
Ok(_) => Err(anyhow!("Failed to query /metrics")),
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
}
};
let res = start_process(
"object_storage",
&self.data_dir.clone().into_std_path_buf(),
&self.bin.clone().into_std_path_buf(),
vec![self.config_path().to_string()],
vec![("RUST_LOG".into(), "debug".into())],
background_process::InitialPidFile::Create(self.pid_file()),
retry_timeout,
process_status_check,
)
.await;
if res.is_err() {
eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?);
}
res
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
stop_process(immediate, "object_storage", &self.pid_file())
}
fn log_file(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.log")
}
fn pid_file(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.pid")
}
}

View File

@@ -318,7 +318,7 @@ impl PageServerNode {
self.conf.id, datadir,
)
})?;
let args = vec!["-D", datadir_path_str, "--dev"];
let args = vec!["-D", datadir_path_str];
background_process::start_process(
"pageserver",

View File

@@ -162,7 +162,6 @@ impl SafekeeperNode {
listen_http,
"--availability-zone".to_owned(),
availability_zone,
"--dev".to_owned(),
];
if let Some(pg_tenant_only_port) = self.conf.pg_tenant_only_port {
let listen_pg_tenant_only = format!("{}:{}", self.listen_addr, pg_tenant_only_port);

View File

@@ -941,7 +941,7 @@ async fn main() -> anyhow::Result<()> {
let mut node_to_fill_descs = Vec::new();
for desc in node_descs {
let to_drain = nodes.contains(&desc.id);
let to_drain = nodes.iter().any(|id| *id == desc.id);
if to_drain {
node_to_drain_descs.push(desc);
} else {

View File

@@ -151,7 +151,7 @@ Example body:
```
{
"tenant_id": "1f359dd625e519a1a4e8d7509690f6fc",
"stripe_size": 2048,
"stripe_size": 32768,
"shards": [
{"node_id": 344, "shard_number": 0},
{"node_id": 722, "shard_number": 1},

View File

@@ -0,0 +1,22 @@
[package]
name = "aztraffic"
version = "0.0.0"
edition.workspace = true
license.workspace = true
publish = false
[dependencies]
anyhow = "1.0.97"
aws-config = "1.6.1"
aws-sdk-athena = "1.68.0"
aws-sdk-ec2 = "1.121.0"
aws-sdk-eks = "1.82.0"
aws-sdk-glue = "1.88.0"
aws-sdk-lambda = "1.75.0"
aws-sdk-scheduler = "1.64.0"
aws-sdk-sfn = "1.68.0"
aws-sdk-sts = "1.65.0"
clap = { version = "4.5.35", features = ["derive", "env"] }
tokio = { version = "1.44.1", features = ["full"] }
serde = "1.0.219"
serde_json = { version = "1.0.140", features = ["preserve_order"] }

View File

@@ -0,0 +1,794 @@
use std::fs;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_sdk_ec2::types::{
DestinationFileFormat, DestinationOptionsRequest, FlowLogsResourceType, LogDestinationType,
TrafficType,
};
use aws_sdk_glue::primitives::Blob;
use aws_sdk_glue::types::{Column, DatabaseInput, SerDeInfo, StorageDescriptor, TableInput};
use aws_sdk_lambda::types::{Environment, FunctionCode, Runtime};
use aws_sdk_scheduler::types::{
DeadLetterConfig, FlexibleTimeWindow, FlexibleTimeWindowMode, RetryPolicy, Target,
};
use aws_sdk_sfn::types::{CloudWatchLogsLogGroup, LogDestination, LogLevel, LoggingConfiguration};
use clap::Parser;
use serde_json::json;
#[derive(Parser, Clone, Debug)]
struct Args {
#[arg(long, value_name = "id")]
account_id: String,
#[arg(long, value_name = "region")]
region: String,
#[arg(long, value_name = "cluster")]
cluster: String,
#[arg(long, value_name = "id")]
vpc_id: Vec<String>,
#[arg(long, value_name = "arn")]
log_group_arn: String,
#[arg(long, value_name = "name")]
pod_info_s3_bucket_name: String,
#[arg(
long,
value_name = "path",
default_value = "CrossAZTraffic/pod_info_dumper/pod_info.csv"
)]
pod_info_s3_bucket_key: String,
#[arg(long, value_name = "uri")]
pod_info_s3_bucket_uri: String,
#[arg(long, value_name = "uri")]
vpc_flow_logs_s3_bucket_uri: String,
#[arg(long, value_name = "uri")]
results_s3_bucket_uri: String,
#[arg(
long,
value_name = "name",
default_value = "./target/lambda/pod_info_dumper/bootstrap.zip"
)]
lambda_zipfile_path: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-podinfo-function"
)]
lambda_function_name: String,
#[arg(long, value_name = "arn")]
lambda_role_arn: String,
#[arg(long, value_name = "name")]
glue_database_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-podinfo-table"
)]
glue_pod_info_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-vpcflowlogs-table"
)]
glue_vpc_flow_logs_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-results-table"
)]
glue_results_table_name: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-trigger-schedule"
)]
schedule_name: String,
#[arg(long, value_name = "minutes", default_value_t = 60)]
schedule_interval_minutes: usize,
#[arg(long, value_name = "arn")]
schedule_target_state_machine_arn: String,
#[arg(long, value_name = "arn")]
schedule_target_role_arn: String,
#[arg(long, value_name = "arn")]
schedule_dead_letter_queue_arn: Option<String>,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-combine-query"
)]
athena_query_name: String,
#[arg(long, value_name = "uri")]
vpcflowlogs_destination_s3_bucket_uri: String,
#[arg(
long,
value_name = "name",
default_value = "CrossAZTraffic-statemachine"
)]
statemachine_name: String,
#[arg(long, value_name = "arn")]
statemachine_role_arn: String,
#[arg(long, value_name = "uri")]
athena_results_s3_bucket_uri: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
eprintln!("{args:#?}");
// TODO: athena results bucket + lifecycle config
// TODO: iam role split
// TODO: iam policy
// TODO: clusterrole + binding
// TODO: eks mapping
// TODO: log group
// TODO: dlq
let sdk_config = create_sdk_config(&args).await?;
LambdaFunction {
local_zipfile_path: args.lambda_zipfile_path,
function_name: args.lambda_function_name.clone(),
role_arn: args.lambda_role_arn,
account_id: args.account_id,
region: args.region,
cluster: args.cluster,
s3_bucket_name: args.pod_info_s3_bucket_name,
s3_bucket_key: args.pod_info_s3_bucket_key,
}
.create(&sdk_config)
.await?;
GlueDatabase {
database_name: args.glue_database_name.clone(),
pod_info_table_name: args.glue_pod_info_table_name.clone(),
pod_info_s3_bucket_uri: args.pod_info_s3_bucket_uri,
vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name.clone(),
vpc_flow_logs_s3_bucket_uri: args.vpc_flow_logs_s3_bucket_uri,
results_table_name: args.glue_results_table_name.clone(),
results_s3_bucket_uri: args.results_s3_bucket_uri,
}
.create(&sdk_config)
.await?;
let named_query_id = AthenaQuery {
query_name: args.athena_query_name,
glue_database: args.glue_database_name.clone(),
invocation_frequency: args.schedule_interval_minutes,
athena_results_table_name: args.glue_results_table_name,
vpc_flow_logs_table_name: args.glue_vpc_flow_logs_table_name,
pod_info_table_name: args.glue_pod_info_table_name,
}
.create(&sdk_config)
.await?;
StateMachine {
name: args.statemachine_name,
role_arn: args.statemachine_role_arn,
named_query_id,
glue_database: args.glue_database_name,
lambda_function_name: args.lambda_function_name,
athena_results_s3_bucket_uri: args.athena_results_s3_bucket_uri,
log_group_arn: args.log_group_arn,
}
.create(&sdk_config)
.await?;
Schedule {
name: args.schedule_name,
interval_minutes: args.schedule_interval_minutes,
dead_letter_queue_arn: args.schedule_dead_letter_queue_arn,
target_role_arn: args.schedule_target_role_arn,
target_state_machine_arn: args.schedule_target_state_machine_arn,
}
.create(&sdk_config)
.await?;
let flow_log_ids = VpcFlowLogs {
vpc_ids: args.vpc_id,
destination_s3_bucket_uri: args.vpcflowlogs_destination_s3_bucket_uri,
}
.create(&sdk_config)
.await?;
println!("VPC flow log IDs: {:?}", flow_log_ids.as_slice());
Ok(())
}
async fn create_sdk_config(args: &Args) -> anyhow::Result<aws_config::SdkConfig> {
let region = aws_config::Region::new(args.region.to_owned());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
struct LambdaFunction {
local_zipfile_path: String,
function_name: String,
role_arn: String,
account_id: String,
region: String,
cluster: String,
s3_bucket_name: String,
s3_bucket_key: String,
}
impl LambdaFunction {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let code = fs::read(&self.local_zipfile_path)?;
let client = aws_sdk_lambda::Client::new(sdk_config);
client
.delete_function()
.function_name(&self.function_name)
.send()
.await
.ok();
client
.create_function()
.function_name(&self.function_name)
.runtime(Runtime::Providedal2023)
.handler("bootstrap")
.role(&self.role_arn)
.code(FunctionCode::builder().zip_file(Blob::new(code)).build())
.timeout(60)
.environment(
Environment::builder()
.set_variables(Some(
[
("NEON_ACCOUNT_ID", self.account_id.as_str()),
("NEON_REGION", self.region.as_str()),
("NEON_CLUSTER", self.cluster.as_str()),
("NEON_S3_BUCKET_NAME", self.s3_bucket_name.as_str()),
("NEON_S3_BUCKET_KEY", self.s3_bucket_key.as_str()),
("AWS_LAMBDA_LOG_FORMAT", "JSON"),
("AWS_LAMBDA_LOG_LEVEL", "INFO"),
]
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
))
.build(),
)
.send()
.await?;
Ok(())
}
}
struct VpcFlowLogs {
vpc_ids: Vec<String>,
destination_s3_bucket_uri: String,
}
impl VpcFlowLogs {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<Vec<String>> {
let ec2_client = aws_sdk_ec2::Client::new(sdk_config);
let flow_logs = ec2_client
.create_flow_logs()
.resource_type(FlowLogsResourceType::Vpc)
.set_resource_ids(Some(self.vpc_ids.clone()))
.traffic_type(TrafficType::All)
.log_destination_type(LogDestinationType::S3)
.log_destination(&self.destination_s3_bucket_uri)
.destination_options(
DestinationOptionsRequest::builder()
.file_format(DestinationFileFormat::Parquet)
.hive_compatible_partitions(false)
.per_hour_partition(true)
.build(),
)
.log_format("${region} ${az-id} ${vpc-id} ${flow-direction} ${pkt-srcaddr} ${pkt-dstaddr} ${srcport} ${dstport} ${start} ${bytes}")
.send()
.await?;
if let Some(unsuccessful) = flow_logs
.unsuccessful
.as_ref()
.and_then(|v| if v.is_empty() { None } else { Some(v) })
{
anyhow::bail!("VPC flow log creation unsuccessful: {unsuccessful:?}");
}
Ok(flow_logs.flow_log_ids().iter().cloned().collect())
}
}
struct GlueDatabase {
database_name: String,
pod_info_table_name: String,
pod_info_s3_bucket_uri: String,
vpc_flow_logs_table_name: String,
vpc_flow_logs_s3_bucket_uri: String,
results_table_name: String,
results_s3_bucket_uri: String,
}
impl GlueDatabase {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let glue_client = aws_sdk_glue::Client::new(sdk_config);
let db = DatabaseInput::builder().name(&self.database_name).build()?;
glue_client
.create_database()
.database_input(db.clone())
.send()
.await?;
let pod_info_columns = &[
Column::builder()
.name("namespace")
.r#type("string")
.build()?,
Column::builder().name("name").r#type("string").build()?,
Column::builder().name("ip").r#type("string").build()?,
Column::builder()
.name("creation_time")
.r#type("timestamp")
.build()?,
Column::builder().name("node").r#type("string").build()?,
Column::builder().name("az").r#type("string").build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.pod_info_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.pod_info_s3_bucket_uri)
.compressed(false)
.set_columns(Some(pod_info_columns.into_iter().cloned().collect()))
.input_format("org.apache.hadoop.mapred.TextInputFormat")
.output_format(
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.serde2.OpenCSVSerde",
)
.parameters("separatorChar", ",")
.parameters("quoteChar", "`")
.parameters("escapeChar", r"\")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "csv")
.parameters("skip.header.line.count", "1")
.retention(0)
.build()?,
)
.send()
.await?;
let vpc_flow_logs_columns = &[
Column::builder().name("region").r#type("string").build()?,
Column::builder().name("az_id").r#type("string").build()?,
Column::builder().name("vpc_id").r#type("string").build()?,
Column::builder()
.name("flow_direction")
.r#type("string")
.build()?,
Column::builder()
.name("pkt_srcaddr")
.r#type("string")
.build()?,
Column::builder()
.name("pkt_dstaddr")
.r#type("string")
.build()?,
Column::builder().name("srcport").r#type("int").build()?,
Column::builder().name("dstport").r#type("int").build()?,
Column::builder().name("start").r#type("bigint").build()?,
Column::builder().name("bytes").r#type("bigint").build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.vpc_flow_logs_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.vpc_flow_logs_s3_bucket_uri)
.compressed(false)
.set_columns(Some(vpc_flow_logs_columns.into_iter().cloned().collect()))
.input_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
)
.output_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
)
.parameters("serialization.format", "1")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "parquet")
.retention(0)
.build()?,
)
.send()
.await?;
let athena_results_columns = &[
Column::builder().name("time").r#type("timestamp").build()?,
Column::builder().name("traffic").r#type("string").build()?,
Column::builder()
.name("total_bytes")
.r#type("bigint")
.build()?,
];
glue_client
.create_table()
.database_name(db.name())
.table_input(
TableInput::builder()
.name(&self.results_table_name)
.storage_descriptor(
StorageDescriptor::builder()
.location(&self.results_s3_bucket_uri)
.compressed(false)
.set_columns(Some(athena_results_columns.into_iter().cloned().collect()))
.input_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
)
.output_format(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
)
.serde_info(
SerDeInfo::builder()
.serialization_library(
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
)
.parameters("serialization.format", "1")
.build(),
)
.build(),
)
.table_type("EXTERNAL_TABLE")
.parameters("classification", "parquet")
.retention(0)
.build()?,
)
.send()
.await?;
Ok(())
}
}
struct AthenaQuery {
query_name: String,
glue_database: String,
invocation_frequency: usize,
athena_results_table_name: String,
vpc_flow_logs_table_name: String,
pod_info_table_name: String,
}
impl AthenaQuery {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<String> {
let Self {
athena_results_table_name,
vpc_flow_logs_table_name,
pod_info_table_name,
invocation_frequency,
..
} = self;
let query_string = format!(
r#"
INSERT INTO "{athena_results_table_name}"
WITH
ip_addresses_and_az_mapping AS (
SELECT
DISTINCT pkt_srcaddr AS ipaddress,
az_id
FROM "{vpc_flow_logs_table_name}"
WHERE flow_direction = 'egress'
AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute))
),
egress_flows_of_pods_with_status AS (
SELECT
"{pod_info_table_name}".name AS srcpodname,
pkt_srcaddr AS srcaddr,
pkt_dstaddr AS dstaddr,
"{vpc_flow_logs_table_name}".az_id AS srcazid,
bytes,
start
FROM "{vpc_flow_logs_table_name}"
INNER JOIN "{pod_info_table_name}" ON "{vpc_flow_logs_table_name}".pkt_srcaddr = "{pod_info_table_name}".ip
WHERE flow_direction = 'egress'
AND from_unixtime("{vpc_flow_logs_table_name}".start) > (CURRENT_TIMESTAMP - ({invocation_frequency} * interval '1' minute))
),
cross_az_traffic_by_pod AS (
SELECT
srcaddr,
srcpodname,
dstaddr,
"{pod_info_table_name}".name AS dstpodname,
srcazid,
ip_addresses_and_az_mapping.az_id AS dstazid,
bytes,
start
FROM egress_flows_of_pods_with_status
INNER JOIN "{pod_info_table_name}" ON dstaddr = "{pod_info_table_name}".ip
LEFT JOIN ip_addresses_and_az_mapping ON dstaddr = ipaddress
WHERE ip_addresses_and_az_mapping.az_id != srcazid
)
SELECT
date_trunc('MINUTE', from_unixtime(start)) AS time,
CONCAT(srcpodname, ' -> ', dstpodname) AS traffic,
SUM(bytes) AS total_bytes
FROM cross_az_traffic_by_pod
GROUP BY date_trunc('MINUTE', from_unixtime(start)), CONCAT(srcpodname, ' -> ', dstpodname)
ORDER BY time, total_bytes DESC
"#
);
let athena_client = aws_sdk_athena::Client::new(sdk_config);
let res = athena_client
.create_named_query()
.name(&self.query_name)
.database(&self.glue_database)
.query_string(query_string)
.send()
.await?;
Ok(res.named_query_id.unwrap())
}
}
struct StateMachine {
name: String,
role_arn: String,
named_query_id: String,
glue_database: String,
lambda_function_name: String,
athena_results_s3_bucket_uri: String,
log_group_arn: String,
}
impl StateMachine {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let sfn_client = aws_sdk_sfn::Client::new(sdk_config);
sfn_client
.create_state_machine()
.name(&self.name)
.role_arn(&self.role_arn)
.logging_configuration(
LoggingConfiguration::builder()
.level(LogLevel::All)
.destinations(
LogDestination::builder()
.cloud_watch_logs_log_group(
CloudWatchLogsLogGroup::builder()
.log_group_arn(&self.log_group_arn)
.build(),
)
.build(),
)
.build(),
)
.definition(
json!(
{
"StartAt": "Invoke",
"States": {
"Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Output": "{% $states.result.Payload %}",
"Arguments": {
"FunctionName": self.lambda_function_name,
"Payload": json!({
"detail-type": "Scheduled Event",
"source": "aws.events",
"detail": {}
}).to_string()
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "Check"
},
"Check": {
"Type": "Choice",
"Choices": [
{
"Next": "GetNamedQuery",
"Condition": "{% $states.input.statusCode = 200 %}"
}
],
"Default": "Fail"
},
"GetNamedQuery": {
"Type": "Task",
"Arguments": {
"NamedQueryId": self.named_query_id
},
"Resource": "arn:aws:states:::aws-sdk:athena:getNamedQuery",
"Output": {
"QueryString": "{% $states.result.NamedQuery.QueryString %}"
},
"Next": "StartQueryExecution"
},
"StartQueryExecution": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Arguments": {
"QueryString": "{% $states.input.QueryString %}",
"QueryExecutionContext": {
"Database": self.glue_database
},
"ResultConfiguration": {
"OutputLocation": self.athena_results_s3_bucket_uri
},
"WorkGroup": "primary"
},
"End": true
},
"Fail": {
"Type": "Fail"
}
},
"QueryLanguage": "JSONata"
}
)
.to_string(),
)
.send()
.await?;
Ok(())
}
}
struct Schedule {
name: String,
interval_minutes: usize,
target_state_machine_arn: String,
target_role_arn: String,
dead_letter_queue_arn: Option<String>,
}
impl Schedule {
async fn create(&self, sdk_config: &aws_config::SdkConfig) -> anyhow::Result<()> {
let sched_client = aws_sdk_scheduler::Client::new(sdk_config);
sched_client
.create_schedule()
.name(&self.name)
.schedule_expression(format!("rate({} minute)", self.interval_minutes))
.flexible_time_window(
FlexibleTimeWindow::builder()
.mode(FlexibleTimeWindowMode::Off)
.build()?,
)
.target(
Target::builder()
.arn(&self.target_state_machine_arn)
.role_arn(&self.target_role_arn)
.input(
json!({
"detail-type": "Scheduled Event",
"source": "aws.events",
"detail": {}
})
.to_string(),
)
.retry_policy(
RetryPolicy::builder()
.maximum_retry_attempts(0)
.maximum_event_age_in_seconds(60)
.build(),
)
.set_dead_letter_config(
self.dead_letter_queue_arn
.as_ref()
.map(|arn| DeadLetterConfig::builder().arn(arn).build()),
)
.build()?,
)
.send()
.await?;
Ok(())
}
}
struct KubernetesRoles {
region: String,
cluster: String,
k8s_role_prefix: String,
lambda_role_arn: String,
}
impl KubernetesRoles {
fn print(&self) -> anyhow::Result<()> {
let Self {
region,
cluster,
k8s_role_prefix,
lambda_role_arn,
} = self;
let yaml = format!(
r#"
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {k8s_role_prefix}-clusterrole
rules:
- apiGroups:
- ""
resources: ["nodes", "namespaces", "pods"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {k8s_role_prefix}-binding
subjects:
- kind: Group
name: {k8s_role_prefix}-group
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: ClusterRole
name: {k8s_role_prefix}-clusterrole
apiGroup: rbac.authorization.k8s.io
"#
);
let eksctl = format!(
r#"eksctl create iamidentitymapping \
--region "{region}"
--cluster "{cluster}" \
--arn "{lambda_role_arn}" \
--username "{k8s_role_prefix}-binding" \
--group "{k8s_role_prefix}-group"
"#
);
Ok(())
}
}

View File

@@ -0,0 +1,27 @@
[package]
name = "pod_info_dumper"
version = "0.0.0"
edition = "2024"
publish = false
[dependencies]
aws_lambda_events = { version = "0.16.0", default-features = false, features = ["eventbridge"] }
aws-config = { workspace = true }
aws-sdk-eks = "1.75.0"
aws-sdk-s3 = { workspace = true }
aws-sdk-sts = "1.65.0"
aws-sigv4 = "1.3.0"
base64 = { version = "0.22.1" }
csv = { version = "1.3.1", default-features = false }
http = { workspace = true }
k8s-openapi = { version = "0.24.0", default-features = false, features = ["v1_31"] }
kube = { version = "0.99.0", default-features = false, features = ["client", "rustls-tls"] }
lambda_runtime = { version = "0.13.0", default-features = false, features = ["tracing"] }
rustls = { version = "0.23.25" }
rustls-pemfile = { workspace = true }
secrecy = "0.10.3"
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true, features = ["asm"] }
tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true, features = ["max_level_debug", "release_max_level_info"] }

View File

@@ -0,0 +1,8 @@
# pod_info_dumper
An event-triggered AWS lambda function that writes the list of all pods with
node information to a CSV file in S3.
```shell
cargo lambda build -p pod_info_dumper --output-format Zip --x86-64 --profile release-lambda-function
```

View File

@@ -0,0 +1,420 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use std::{env, io};
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::retry::RetryConfig;
use aws_lambda_events::event::eventbridge::EventBridgeEvent;
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
use aws_sdk_s3::types::ChecksumAlgorithm;
use aws_sdk_sts::config::ProvideCredentials;
use aws_sigv4::http_request::{
SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
};
use aws_sigv4::sign::v4;
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD;
use base64::prelude::*;
use k8s_openapi::api::core::v1::{Node, Pod};
use k8s_openapi::chrono::SecondsFormat;
use kube::api::{Api, ListParams, ResourceExt};
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
use secrecy::SecretString;
use serde::ser::SerializeMap;
use sha2::{Digest as _, Sha256};
const AZ_LABEL: &str = "topology.kubernetes.io/zone";
#[derive(Debug)]
struct Config {
aws_account_id: String,
s3_bucket: S3BucketConfig,
eks_cluster: EksClusterConfig,
}
#[derive(Debug)]
struct S3BucketConfig {
region: String,
name: String,
key: String,
}
impl S3BucketConfig {
#[tracing::instrument(skip_all, err)]
async fn create_sdk_config(&self) -> Result<aws_config::SdkConfig, Error> {
let region = aws_config::Region::new(self.region.clone());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
}
#[derive(Debug)]
struct EksClusterConfig {
region: String,
name: String,
}
impl EksClusterConfig {
#[tracing::instrument(skip_all, err)]
async fn create_sdk_config(&self) -> Result<aws_config::SdkConfig, Error> {
let region = aws_config::Region::new(self.region.clone());
let credentials_provider = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
Ok(aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials_provider)
.load()
.await)
}
}
#[tokio::main]
pub async fn start() -> Result<(), Error> {
tracing::init_default_subscriber();
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();
tracing::info!("function handler started");
let config = Config {
aws_account_id: env::var("NEON_ACCOUNT_ID")?,
s3_bucket: S3BucketConfig {
region: env::var("NEON_REGION")?,
name: env::var("NEON_S3_BUCKET_NAME")?,
key: env::var("NEON_S3_BUCKET_KEY")?,
},
eks_cluster: EksClusterConfig {
region: env::var("NEON_REGION")?,
name: env::var("NEON_CLUSTER")?,
},
};
run(service_fn(async |event: LambdaEvent<EventBridgeEvent<serde_json::Value>>| -> Result<StatusResponse, Error> {
function_handler(event, &config).await
}))
.await
}
#[derive(Debug, PartialEq)]
struct StatusResponse {
status_code: http::StatusCode,
body: Cow<'static, str>,
}
impl StatusResponse {
fn ok() -> Self {
StatusResponse {
status_code: http::StatusCode::OK,
body: "OK".into(),
}
}
}
impl serde::Serialize for StatusResponse {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut serializer = serializer.serialize_map(None)?;
serializer.serialize_entry("statusCode", &self.status_code.as_u16())?;
serializer.serialize_entry("body", &self.body)?;
serializer.end()
}
}
#[tracing::instrument(skip_all, fields(?event), err)]
async fn function_handler(
event: LambdaEvent<EventBridgeEvent<serde_json::Value>>,
config: &Config,
) -> Result<StatusResponse, Error> {
tracing::info!("function handler called");
let kube_client = connect_to_cluster(config).await?;
let s3_client = connect_to_s3(config).await?;
let nodes_azs = get_nodes_azs(kube_client.clone()).await?;
let mut pods_info = get_current_pods(kube_client.clone(), &nodes_azs).await?;
pods_info.sort_unstable();
let mut csv = Vec::with_capacity(64 * 1024);
write_csv(&pods_info, &mut csv)?;
tracing::info!(
"csv is {} bytes, containing {} pods",
csv.len(),
pods_info.len()
);
upload_csv(config, &s3_client, &csv).await?;
tracing::info!("pod info successfully stored");
Ok(StatusResponse::ok())
}
#[derive(Debug, serde::Serialize, PartialEq, Eq, PartialOrd, Ord)]
struct PodInfo<'a> {
namespace: String,
name: String,
ip: String,
creation_time: String,
node: String,
az: Option<&'a str>,
}
#[tracing::instrument(skip_all, err)]
async fn connect_to_cluster(config: &Config) -> Result<kube::Client, Error> {
let sdk_config = config.eks_cluster.create_sdk_config().await?;
let eks_client = aws_sdk_eks::Client::new(&sdk_config);
let resp = eks_client
.describe_cluster()
.name(&config.eks_cluster.name)
.send()
.await?;
let cluster = resp
.cluster()
.ok_or_else(|| format!("cluster not found: {}", config.eks_cluster.name))?;
let endpoint = cluster.endpoint().ok_or("cluster endpoint not found")?;
let ca_data = cluster
.certificate_authority()
.and_then(|ca| ca.data())
.ok_or("cluster certificate data not found")?;
let mut k8s_config = kube::Config::new(endpoint.parse()?);
let cert_bytes = STANDARD.decode(ca_data)?;
let certs = rustls_pemfile::certs(&mut cert_bytes.as_slice())
.map(|c| c.map(|c| c.to_vec()))
.collect::<Result<_, _>>()?;
k8s_config.root_cert = Some(certs);
k8s_config.auth_info.token = Some(
create_kube_auth_token(
&sdk_config,
&config.eks_cluster.name,
Duration::from_secs(10 * 60),
)
.await?,
);
tracing::info!("cluster description completed");
Ok(kube::Client::try_from(k8s_config)?)
}
#[tracing::instrument(skip_all, err)]
async fn create_kube_auth_token(
sdk_config: &aws_config::SdkConfig,
cluster_name: &str,
expires_in: Duration,
) -> Result<SecretString, Error> {
let identity = sdk_config
.credentials_provider()
.unwrap()
.provide_credentials()
.await?
.into();
let region = sdk_config.region().expect("region").as_ref();
let host = format!("sts.{region}.amazonaws.com");
let get_caller_id_url = format!("https://{host}/?Action=GetCallerIdentity&Version=2011-06-15");
let mut signing_settings = SigningSettings::default();
signing_settings.signature_location = SignatureLocation::QueryParams;
signing_settings.expires_in = Some(expires_in);
let signing_params = v4::SigningParams::builder()
.identity(&identity)
.region(region)
.name("sts")
.time(SystemTime::now())
.settings(signing_settings)
.build()?
.into();
let signable_request = SignableRequest::new(
"GET",
&get_caller_id_url,
[("host", host.as_str()), ("x-k8s-aws-id", cluster_name)].into_iter(),
SignableBody::Bytes(&[]),
)?;
let (signing_instructions, _signature) = sign(signable_request, &signing_params)?.into_parts();
let mut token_request = http::Request::get(get_caller_id_url).body(()).unwrap();
signing_instructions.apply_to_request_http1x(&mut token_request);
let token = format!(
"k8s-aws-v1.{}",
BASE64_STANDARD_NO_PAD.encode(token_request.uri().to_string())
)
.into();
Ok(token)
}
#[tracing::instrument(skip_all, err)]
async fn connect_to_s3(config: &Config) -> Result<aws_sdk_s3::Client, Error> {
let sdk_config = config.s3_bucket.create_sdk_config().await?;
let s3_client = aws_sdk_s3::Client::from_conf(
aws_sdk_s3::config::Builder::from(&sdk_config)
.retry_config(RetryConfig::standard())
.build(),
);
Ok(s3_client)
}
#[tracing::instrument(skip_all, err)]
async fn get_nodes_azs(client: kube::Client) -> Result<HashMap<String, String>, Error> {
let nodes = Api::<Node>::all(client);
let list_params = ListParams::default().timeout(10);
let mut nodes_azs = HashMap::default();
for node in nodes.list(&list_params).await? {
let Some(name) = node.metadata.name else {
tracing::warn!("pod without name");
continue;
};
let Some(mut labels) = node.metadata.labels else {
tracing::warn!(name, "pod without labels");
continue;
};
let Some(az) = labels.remove(AZ_LABEL) else {
tracing::warn!(name, "pod without AZ label");
continue;
};
tracing::debug!(name, az, "adding node");
nodes_azs.insert(name, az);
}
Ok(nodes_azs)
}
#[tracing::instrument(skip_all, err)]
async fn get_current_pods(
client: kube::Client,
node_az: &HashMap<String, String>,
) -> Result<Vec<PodInfo<'_>>, Error> {
let pods = Api::<Pod>::all(client);
let mut pods_info = vec![];
let mut continuation_token = Some(String::new());
while let Some(token) = continuation_token {
let list_params = ListParams::default()
.timeout(10)
.limit(500)
.continue_token(&token);
let list = pods.list(&list_params).await?;
continuation_token = list.metadata.continue_;
tracing::info!("received list of {} pods", list.items.len());
for pod in list.items {
let name = pod.name_any();
let Some(namespace) = pod.namespace() else {
tracing::warn!(name, "pod without namespace");
continue;
};
let Some(status) = pod.status else {
tracing::warn!(namespace, name, "pod without status");
continue;
};
let Some(conditions) = status.conditions else {
tracing::warn!(namespace, name, "pod without conditions");
continue;
};
let Some(ready_condition) = conditions.iter().find(|cond| cond.type_ == "Ready") else {
tracing::debug!(namespace, name, "pod not ready");
continue;
};
let Some(ref ready_time) = ready_condition.last_transition_time else {
tracing::warn!(
namespace,
name,
"pod ready condition without transition time"
);
continue;
};
let Some(spec) = pod.spec else {
tracing::warn!(namespace, name, "pod without spec");
continue;
};
let Some(node) = spec.node_name else {
tracing::warn!(namespace, name, "pod without node");
continue;
};
let Some(ip) = status.pod_ip else {
tracing::warn!(namespace, name, "pod without IP");
continue;
};
let az = node_az.get(&node).map(String::as_str);
let creation_time = ready_time.0.to_rfc3339_opts(SecondsFormat::Secs, true);
let pod_info = PodInfo {
namespace,
name,
ip,
creation_time,
node,
az,
};
tracing::debug!(?pod_info, "adding pod");
pods_info.push(pod_info);
}
}
Ok(pods_info)
}
#[tracing::instrument(skip_all, err)]
fn write_csv<W: io::Write>(pods_info: &Vec<PodInfo>, writer: W) -> Result<(), Error> {
let mut w = csv::Writer::from_writer(writer);
for pod in pods_info {
w.serialize(pod)?;
}
w.flush()?;
Ok(())
}
#[tracing::instrument(skip_all, err)]
async fn upload_csv(
config: &Config,
s3_client: &aws_sdk_s3::Client,
csv: &[u8],
) -> Result<aws_sdk_s3::operation::put_object::PutObjectOutput, Error> {
let mut hasher = Sha256::new();
hasher.update(csv);
let csum = hasher.finalize();
let resp = s3_client
.put_object()
.bucket(&config.s3_bucket.name)
.key(&config.s3_bucket.key)
.content_type("text/csv")
.checksum_algorithm(ChecksumAlgorithm::Sha256)
.checksum_sha256(STANDARD.encode(csum))
.body(ByteStream::from(SdkBody::from(csv)))
.expected_bucket_owner(&config.aws_account_id)
.send()
.await?;
Ok(resp)
}

View File

@@ -0,0 +1,3 @@
fn main() -> Result<(), lambda_runtime::Error> {
pod_info_dumper::start()
}

View File

@@ -5,14 +5,6 @@ use crate::privilege::Privilege;
use crate::responses::ComputeCtlConfig;
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
/// When making requests to the `compute_ctl` external HTTP server, the client
/// must specify a set of claims in `Authorization` header JWTs such that
/// `compute_ctl` can authorize the request.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ComputeClaims {
pub compute_id: String,
}
/// Request of the /configure API
///
/// We now pass only `spec` in the configuration request, but later we can
@@ -38,3 +30,9 @@ pub struct SetRoleGrantsRequest {
pub privileges: Vec<Privilege>,
pub role: PgIdent,
}
/// Request of the /configure_telemetry API
#[derive(Debug, Deserialize, Serialize)]
pub struct ConfigureTelemetryRequest {
pub logs_export_host: Option<String>,
}

View File

@@ -168,10 +168,6 @@ pub struct ComputeSpec {
/// Extensions should be present in shared_preload_libraries
#[serde(default)]
pub audit_log_level: ComputeAudit,
/// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
/// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
pub logs_export_host: Option<String>,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
@@ -183,6 +179,9 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
/// Allow to configure rsyslog for Postgres logs export
PostgresLogsExport,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.

View File

@@ -51,54 +51,9 @@ pub struct NodeMetadata {
/// If there cannot be a static default value because we need to make runtime
/// checks to determine the default, make it an `Option` (which defaults to None).
/// The runtime check should be done in the consuming crate, i.e., `pageserver`.
///
/// Unknown fields are silently ignored during deserialization.
/// The alternative, which we used in the past, was to set `deny_unknown_fields`,
/// which fails deserialization, and hence pageserver startup, if there is an unknown field.
/// The reason we don't do that anymore is that it complicates
/// usage of config fields for feature flagging, which we commonly do for
/// region-by-region rollouts.
/// The complications mainly arise because the `pageserver.toml` contents on a
/// prod server have a separate lifecycle from the pageserver binary.
/// For instance, `pageserver.toml` contents today are defined in the internal
/// infra repo, and thus introducing a new config field to pageserver and
/// rolling it out to prod servers are separate commits in separate repos
/// that can't be made or rolled back atomically.
/// Rollbacks in particular pose a risk with deny_unknown_fields because
/// the old pageserver binary may reject a new config field, resulting in
/// an outage unless the person doing the pageserver rollback remembers
/// to also revert the commit that added the config field in to the
/// `pageserver.toml` templates in the internal infra repo.
/// (A pre-deploy config check would eliminate this risk during rollbacks,
/// cf [here](https://github.com/neondatabase/cloud/issues/24349).)
/// In addition to this compatibility problem during emergency rollbacks,
/// deny_unknown_fields adds further complications when decomissioning a feature
/// flag: with deny_unknown_fields, we can't remove a flag from the [`ConfigToml`]
/// until all prod servers' `pageserver.toml` files have been updated to a version
/// that doesn't specify the flag. Otherwise new software would fail to start up.
/// This adds the requirement for an intermediate step where the new config field
/// is accepted but ignored, prolonging the decomissioning process by an entire
/// release cycle.
/// By contrast with unknown fields silently ignored, decomissioning a feature
/// flag is a one-step process: we can skip the intermediate step and straight
/// remove the field from the [`ConfigToml`]. We leave the field in the
/// `pageserver.toml` files on prod servers until we reach certainty that we
/// will not roll back to old software whose behavior was dependent on config.
/// Then we can remove the field from the templates in the internal infra repo.
/// This process is [documented internally](
/// https://docs.neon.build/storage/pageserver_configuration.html).
///
/// Note that above relaxed compatbility for the config format does NOT APPLY
/// TO THE STORAGE FORMAT. As general guidance, when introducing storage format
/// changes, ensure that the potential rollback target version will be compatible
/// with the new format. This must hold regardless of what flags are set in in the `pageserver.toml`:
/// any format version that exists in an environment must be compatible with the software that runs there.
/// Use a pageserver.toml flag only to gate whether software _writes_ the new format.
/// For more compatibility considerations, refer to [internal docs](
/// https://docs.neon.build/storage/compat.html?highlight=compat#format-versions--compatibility)
#[serde_as]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct ConfigToml {
// types mapped 1:1 into the runtime PageServerConfig type
pub listen_pg_addr: String,
@@ -183,6 +138,7 @@ pub struct ConfigToml {
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DiskUsageEvictionTaskConfig {
pub max_usage_pct: utils::serde_percent::Percent,
pub min_avail_bytes: u64,
@@ -197,11 +153,13 @@ pub struct DiskUsageEvictionTaskConfig {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub enum PageServicePipeliningConfig {
Serial,
Pipelined(PageServicePipeliningConfigPipelined),
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PageServicePipeliningConfigPipelined {
/// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
@@ -217,6 +175,7 @@ pub enum PageServiceProtocolPipelinedExecutionStrategy {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub enum GetVectoredConcurrentIo {
/// The read path is fully sequential: layers are visited
/// one after the other and IOs are issued and waited upon
@@ -335,7 +294,7 @@ pub struct MaxVectoredReadBytes(pub NonZeroUsize);
/// Tenant-level configuration values, used for various purposes.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(default)]
#[serde(deny_unknown_fields, default)]
pub struct TenantConfigToml {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the

View File

@@ -613,7 +613,8 @@ mod tests {
use rand::{RngCore, SeedableRng};
use super::*;
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber, ShardStripeSize};
use crate::models::ShardParameters;
use crate::shard::{ShardCount, ShardNumber};
// Helper function to create a key range.
//
@@ -963,8 +964,12 @@ mod tests {
}
#[test]
fn sharded_range_relation_gap() {
let shard_identity =
ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let range = ShardedRange::new(
Range {
@@ -980,8 +985,12 @@ mod tests {
#[test]
fn shard_identity_keyspaces_single_key() {
let shard_identity =
ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let range = ShardedRange::new(
Range {
@@ -1025,8 +1034,12 @@ mod tests {
#[test]
fn shard_identity_keyspaces_forkno_gap() {
let shard_identity =
ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let range = ShardedRange::new(
Range {
@@ -1048,7 +1061,7 @@ mod tests {
let shard_identity = ShardIdentity::new(
ShardNumber(shard_number),
ShardCount::new(4),
DEFAULT_STRIPE_SIZE,
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
@@ -1131,44 +1144,37 @@ mod tests {
/// for a single tenant.
#[test]
fn sharded_range_fragment_simple() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
// A range which we happen to know covers exactly one stripe which belongs to this shard
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let mut input_end = input_start;
input_end.field6 += STRIPE_SIZE; // field6 is block number
let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
// Ask for stripe_size blocks, we get the whole stripe
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 32768),
(32768, vec![(32768, input_start..input_end)])
);
// Ask for more, we still get the whole stripe
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 10 * STRIPE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 10000000),
(32768, vec![(32768, input_start..input_end)])
);
// Ask for target_nblocks of half the stripe size, we get two halves
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE / 2),
do_fragment(input_start, input_end, &shard_identity, 16384),
(
STRIPE_SIZE,
32768,
vec![
(
STRIPE_SIZE / 2,
input_start..input_start.add(STRIPE_SIZE / 2)
),
(STRIPE_SIZE / 2, input_start.add(STRIPE_SIZE / 2)..input_end)
(16384, input_start..input_start.add(16384)),
(16384, input_start.add(16384)..input_end)
]
)
);
@@ -1176,53 +1182,40 @@ mod tests {
#[test]
fn sharded_range_fragment_multi_stripe() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
// A range which covers multiple stripes, exactly one of which belongs to the current shard.
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let mut input_end = input_start;
input_end.field6 += RANGE_SIZE; // field6 is block number
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
// Ask for all the blocks, get a fragment that covers the whole range but reports
// its size to be just the blocks belonging to our shard.
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, RANGE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 131072),
(32768, vec![(32768, input_start..input_end)])
);
// Ask for a sub-stripe quantity that results in 3 fragments.
let limit = STRIPE_SIZE / 3 + 1;
// Ask for a sub-stripe quantity
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, limit),
do_fragment(input_start, input_end, &shard_identity, 16000),
(
STRIPE_SIZE,
32768,
vec![
(limit, input_start..input_start.add(limit)),
(limit, input_start.add(limit)..input_start.add(2 * limit)),
(
STRIPE_SIZE - 2 * limit,
input_start.add(2 * limit)..input_end
),
(16000, input_start..input_start.add(16000)),
(16000, input_start.add(16000)..input_start.add(32000)),
(768, input_start.add(32000)..input_end),
]
)
);
// Try on a range that starts slightly after our owned stripe
assert_eq!(
do_fragment(input_start.add(1), input_end, &shard_identity, RANGE_SIZE),
(
STRIPE_SIZE - 1,
vec![(STRIPE_SIZE - 1, input_start.add(1)..input_end)]
)
do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
(32767, vec![(32767, input_start.add(1)..input_end)])
);
}
@@ -1230,40 +1223,32 @@ mod tests {
/// a previous relation.
#[test]
fn sharded_range_fragment_starting_from_logical_size() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
let mut input_end = Key::from_hex("000000067f00000001000000ae0100000000").unwrap();
input_end.field6 += RANGE_SIZE; // field6 is block number
let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
// Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
(
STRIPE_SIZE + 1,
vec![(STRIPE_SIZE + 1, input_start..input_end)]
)
do_fragment(input_start, input_end, &shard_identity, 0x10000),
(0x8001, vec![(0x8001, input_start..input_end)])
);
// Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
// store all logical sizes)
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
(1, vec![(1, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 0x10000),
(0x1, vec![(0x1, input_start..input_end)])
);
}
@@ -1299,8 +1284,12 @@ mod tests {
);
// Same, but using a sharded identity
let shard_identity =
ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x8000),
(u32::MAX, vec![(u32::MAX, input_start..input_end),])
@@ -1342,7 +1331,7 @@ mod tests {
ShardIdentity::new(
ShardNumber((prng.next_u32() % shard_count) as u8),
ShardCount::new(shard_count as u8),
DEFAULT_STRIPE_SIZE,
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap()
};

View File

@@ -26,7 +26,7 @@ use utils::{completion, serde_system_time};
use crate::config::Ratio;
use crate::key::{CompactKey, Key};
use crate::reltag::RelTag;
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
use crate::shard::{ShardCount, ShardStripeSize, TenantShardId};
/// The state of a tenant in this pageserver.
///
@@ -80,22 +80,10 @@ pub enum TenantState {
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping {
/// The barrier can be used to wait for shutdown to complete. The first caller to set
/// Some(Barrier) is responsible for driving shutdown to completion. Subsequent callers
/// will wait for the first caller's existing barrier.
///
/// None is set when an attach is cancelled, to signal to shutdown that the attach has in
/// fact cancelled:
///
/// 1. `shutdown` sees `TenantState::Attaching`, and cancels the tenant.
/// 2. `attach` sets `TenantState::Stopping(None)` and exits.
/// 3. `set_stopping` waits for `TenantState::Stopping(None)` and sets
/// `TenantState::Stopping(Some)` to claim the barrier as the shutdown owner.
//
// Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
// otherwise it will not be skipped during deserialization
#[serde(skip)]
progress: Option<completion::Barrier>,
progress: completion::Barrier,
},
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
@@ -438,6 +426,8 @@ pub struct ShardParameters {
}
impl ShardParameters {
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
pub fn is_unsharded(&self) -> bool {
self.count.is_unsharded()
}
@@ -447,7 +437,7 @@ impl Default for ShardParameters {
fn default() -> Self {
Self {
count: ShardCount::new(0),
stripe_size: DEFAULT_STRIPE_SIZE,
stripe_size: Self::DEFAULT_STRIPE_SIZE,
}
}
}
@@ -1114,7 +1104,7 @@ pub struct CompactionAlgorithmSettings {
}
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },
@@ -1678,7 +1668,6 @@ pub struct SecondaryProgress {
pub struct TenantScanRemoteStorageShard {
pub tenant_shard_id: TenantShardId,
pub generation: Option<u32>,
pub stripe_size: Option<ShardStripeSize>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
@@ -2730,15 +2719,10 @@ mod tests {
"Activating",
),
(line!(), TenantState::Active, "Active"),
(
line!(),
TenantState::Stopping { progress: None },
"Stopping",
),
(
line!(),
TenantState::Stopping {
progress: Some(completion::Barrier::default()),
progress: utils::completion::Barrier::default(),
},
"Stopping",
),

View File

@@ -58,8 +58,6 @@ pub enum NeonWalRecord {
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
/// its references in `timeline.rs`.
will_init: bool,
/// Only append the record if the current image is the same as the one specified in this field.
only_if: Option<String>,
},
}
@@ -83,17 +81,6 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
only_if: None,
}
}
#[cfg(feature = "testing")]
pub fn wal_append_conditional(s: impl AsRef<str>, only_if: impl AsRef<str>) -> Self {
Self::Test {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
only_if: Some(only_if.as_ref().to_string()),
}
}
@@ -103,7 +90,6 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: true,
will_init: false,
only_if: None,
}
}
@@ -113,7 +99,6 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: true,
will_init: true,
only_if: None,
}
}
}

View File

@@ -78,12 +78,6 @@ impl Default for ShardStripeSize {
}
}
impl std::fmt::Display for ShardStripeSize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardLayout(u8);
@@ -92,11 +86,8 @@ const LAYOUT_V1: ShardLayout = ShardLayout(1);
/// ShardIdentity uses a magic layout value to indicate if it is unusable
const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
/// The default stripe size in pages. 16 MiB divided by 8 kiB page size.
///
/// A lower stripe size distributes ingest load better across shards, but reduces IO amortization.
/// 16 MiB appears to be a reasonable balance: <https://github.com/neondatabase/neon/pull/10510>.
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(16 * 1024 / 8);
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ShardConfigError {
@@ -546,7 +537,7 @@ mod tests {
field6: 0x7d06,
};
let shard = key_to_shard_number(ShardCount(10), ShardStripeSize(32768), &key);
let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
assert_eq!(shard, ShardNumber(8));
}

View File

@@ -5,6 +5,7 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use std::future::Future;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
@@ -226,7 +227,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
match self {
MaybeWriteOnly::Full(framed) => framed.read_startup_message().await,
MaybeWriteOnly::WriteOnly(_) => {
Err(io::Error::other("reading from write only half").into())
Err(io::Error::new(ErrorKind::Other, "reading from write only half").into())
}
MaybeWriteOnly::Broken => panic!("IO on invalid MaybeWriteOnly"),
}
@@ -236,7 +237,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
match self {
MaybeWriteOnly::Full(framed) => framed.read_message().await,
MaybeWriteOnly::WriteOnly(_) => {
Err(io::Error::other("reading from write only half").into())
Err(io::Error::new(ErrorKind::Other, "reading from write only half").into())
}
MaybeWriteOnly::Broken => panic!("IO on invalid MaybeWriteOnly"),
}
@@ -974,7 +975,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'_, IO> {
.write_message_noflush(&BeMessage::CopyData(buf))
// write_message only writes to the buffer, so it can fail iff the
// message is invaid, but CopyData can't be invalid.
.map_err(|_| io::Error::other("failed to serialize CopyData"))?;
.map_err(|_| io::Error::new(ErrorKind::Other, "failed to serialize CopyData"))?;
Poll::Ready(Ok(buf.len()))
}

View File

@@ -85,8 +85,8 @@ static KEY: Lazy<rustls::pki_types::PrivateKeyDer<'static>> = Lazy::new(|| {
static CERT: Lazy<rustls::pki_types::CertificateDer<'static>> = Lazy::new(|| {
let mut cursor = Cursor::new(include_bytes!("cert.pem"));
rustls_pemfile::certs(&mut cursor).next().unwrap().unwrap()
let cert = rustls_pemfile::certs(&mut cursor).next().unwrap().unwrap();
cert
});
// test that basic select with ssl works

View File

@@ -35,7 +35,7 @@ impl ConnectionError {
pub fn into_io_error(self) -> io::Error {
match self {
ConnectionError::Io(io) => io,
ConnectionError::Protocol(pe) => io::Error::other(pe.to_string()),
ConnectionError::Protocol(pe) => io::Error::new(io::ErrorKind::Other, pe.to_string()),
}
}
}

View File

@@ -257,7 +257,7 @@ pub enum ProtocolError {
impl ProtocolError {
/// Proxy stream.rs uses only io::Error; provide it.
pub fn into_io_error(self) -> io::Error {
io::Error::other(self.to_string())
io::Error::new(io::ErrorKind::Other, self.to_string())
}
}

View File

@@ -212,7 +212,7 @@ impl ScramSha256 {
password,
channel_binding,
} => (nonce, password, channel_binding),
_ => return Err(io::Error::other("invalid SCRAM state")),
_ => return Err(io::Error::new(io::ErrorKind::Other, "invalid SCRAM state")),
};
let message =
@@ -291,7 +291,7 @@ impl ScramSha256 {
server_key,
auth_message,
} => (server_key, auth_message),
_ => return Err(io::Error::other("invalid SCRAM state")),
_ => return Err(io::Error::new(io::ErrorKind::Other, "invalid SCRAM state")),
};
let message =
@@ -301,7 +301,10 @@ impl ScramSha256 {
let verifier = match parsed {
ServerFinalMessage::Error(e) => {
return Err(io::Error::other(format!("SCRAM error: {}", e)));
return Err(io::Error::new(
io::ErrorKind::Other,
format!("SCRAM error: {}", e),
));
}
ServerFinalMessage::Verifier(verifier) => verifier,
};

View File

@@ -28,7 +28,7 @@ toml_edit.workspace = true
tracing.workspace = true
scopeguard.workspace = true
metrics.workspace = true
utils = { path = "../utils", default-features = false }
utils.workspace = true
pin-project-lite.workspace = true
azure_core.workspace = true

View File

@@ -801,7 +801,8 @@ where
// that support needs to be hacked in.
//
// including {self:?} into the message would be useful, but unsure how to unproject.
_ => std::task::Poll::Ready(Err(std::io::Error::other(
_ => std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"cloned or initial values cannot be read",
))),
}
@@ -854,7 +855,7 @@ where
};
Err(azure_core::error::Error::new(
azure_core::error::ErrorKind::Io,
std::io::Error::other(msg),
std::io::Error::new(std::io::ErrorKind::Other, msg),
))
}

View File

@@ -5,8 +5,7 @@ edition.workspace = true
license.workspace = true
[features]
default = ["rename_noreplace"]
rename_noreplace = []
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
@@ -36,7 +35,7 @@ serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["signal"] }
tokio.workspace = true
tokio-tar.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = ["serde"] }

View File

@@ -173,7 +173,7 @@ impl std::fmt::Debug for JwtAuth {
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn encode_from_key_file<S: Serialize>(claims: &S, key_data: &[u8]) -> Result<String> {
pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result<String> {
let key = EncodingKey::from_ed_pem(key_data)?;
Ok(encode(&Header::new(STORAGE_TOKEN_ALGORITHM), claims, &key)?)
}

View File

@@ -81,9 +81,12 @@ pub fn path_with_suffix_extension(
}
pub fn fsync_file_and_parent(file_path: &Utf8Path) -> io::Result<()> {
let parent = file_path
.parent()
.ok_or_else(|| io::Error::other(format!("File {file_path:?} has no parent")))?;
let parent = file_path.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!("File {file_path:?} has no parent"),
)
})?;
fsync(file_path)?;
fsync(parent)?;

View File

@@ -1,26 +0,0 @@
use std::time::{Duration, Instant};
#[derive(Default)]
pub struct ElapsedAccum {
accum: Duration,
}
impl ElapsedAccum {
pub fn get(&self) -> Duration {
self.accum
}
pub fn guard(&mut self) -> impl Drop + '_ {
let start = Instant::now();
scopeguard::guard(start, |last_wait_at| {
self.accum += Instant::now() - last_wait_at;
})
}
pub async fn measure<Fut, O>(&mut self, fut: Fut) -> O
where
Fut: Future<Output = O>,
{
let _guard = self.guard();
fut.await
}
}

View File

@@ -3,9 +3,7 @@ use std::{fs, io, path::Path};
use anyhow::Context;
#[cfg(feature = "rename_noreplace")]
mod rename_noreplace;
#[cfg(feature = "rename_noreplace")]
pub use rename_noreplace::rename_noreplace;
pub trait PathExt {

View File

@@ -8,7 +8,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
dst: &P2,
) -> nix::Result<()> {
{
#[cfg(all(target_os = "linux", target_env = "gnu"))]
#[cfg(target_os = "linux")]
{
nix::fcntl::renameat2(
None,
@@ -29,7 +29,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
})??;
nix::errno::Errno::result(res).map(drop)
}
#[cfg(not(any(all(target_os = "linux", target_env = "gnu"), target_os = "macos")))]
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
std::compile_error!("OS does not support no-replace renames");
}

View File

@@ -93,8 +93,6 @@ pub mod try_rcu;
pub mod guard_arc_swap;
pub mod elapsed_accum;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;

View File

@@ -1,8 +1,6 @@
pub use signal_hook::consts::TERM_SIGNALS;
pub use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
use tokio::signal::unix::{SignalKind, signal};
use tracing::info;
pub enum Signal {
Quit,
@@ -38,30 +36,3 @@ impl ShutdownSignals {
Ok(())
}
}
/// Runs in a loop since we want to be responsive to multiple signals
/// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown)
/// <https://github.com/neondatabase/neon/issues/9740>
pub async fn signal_handler(token: tokio_util::sync::CancellationToken) {
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigquit = signal(SignalKind::quit()).unwrap();
loop {
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
std::process::exit(111);
}
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
if !token.is_cancelled() {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
token.cancel();
} else {
info!("Got signal {signal}. Already shutting down.");
}
}
}

View File

@@ -111,17 +111,9 @@ impl<T> OnceCell<T> {
}
}
/// Like [`Self::get_or_init_detached_measured`], but without out parameter for time spent waiting.
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
self.get_or_init_detached_measured(None).await
}
/// Returns a guard to an existing initialized value, or returns an unique initialization
/// permit which can be used to initialize this `OnceCell` using `OnceCell::set`.
pub async fn get_or_init_detached_measured(
&self,
mut wait_time: Option<&mut crate::elapsed_accum::ElapsedAccum>,
) -> Result<Guard<'_, T>, InitPermit> {
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
// It looks like OnceCell::get_or_init could be implemented using this method instead of
// duplication. However, that makes the future be !Send due to possibly holding on to the
// MutexGuard over an await point.
@@ -133,16 +125,12 @@ impl<T> OnceCell<T> {
}
guard.init_semaphore.clone()
};
{
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
let fut = sem.acquire();
if let Some(wait_time) = wait_time.as_mut() {
wait_time.measure(fut).await
} else {
fut.await
}
sem.acquire().await
};
let Ok(permit) = permit else {

View File

@@ -1,28 +0,0 @@
[package]
name = "object_storage"
version = "0.0.1"
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
axum-extra.workspace = true
axum.workspace = true
camino.workspace = true
futures.workspace = true
jsonwebtoken.workspace = true
prometheus.workspace = true
remote_storage.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
utils = { path = "../libs/utils", default-features = false }
workspace_hack.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true
http-body-util.workspace = true
itertools.workspace = true
rand.workspace = true
test-log.workspace = true
tower.workspace = true

View File

@@ -1,561 +0,0 @@
use anyhow::anyhow;
use axum::body::{Body, Bytes};
use axum::response::{IntoResponse, Response};
use axum::{Router, http::StatusCode};
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use remote_storage::TimeoutOrCancel;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use utils::backoff::retry;
pub fn app(state: Arc<Storage>) -> Router<()> {
use axum::routing::{delete as _delete, get as _get};
let delete_prefix = _delete(delete_prefix);
Router::new()
.route(
"/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
_get(get).put(set).delete(delete),
)
.route(
"/{tenant_id}/{timeline_id}/{endpoint_id}",
delete_prefix.clone(),
)
.route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
.route("/{tenant_id}", delete_prefix)
.route("/metrics", _get(metrics))
.route("/status", _get(async || StatusCode::OK.into_response()))
.with_state(state)
}
type Result = anyhow::Result<Response, Response>;
type State = axum::extract::State<Arc<Storage>>;
const CONTENT_TYPE: &str = "content-type";
const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
const WARN_THRESHOLD: u32 = 3;
const MAX_RETRIES: u32 = 10;
async fn metrics() -> Result {
prometheus::TextEncoder::new()
.encode_to_string(&prometheus::gather())
.map(|s| s.into_response())
.map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
}
async fn get(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "downloading");
let download_err = |e| {
if let DownloadError::NotFound = e {
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
return not_found(&path);
}
internal_error(e, &path, "downloading")
};
let cancel = state.cancel.clone();
let opts = &DownloadOpts::default();
let stream = retry(
async || state.storage.download(&path, opts, &cancel).await,
DownloadError::is_permanent,
WARN_THRESHOLD,
MAX_RETRIES,
"downloading",
&cancel,
)
.await
.unwrap_or(Err(DownloadError::Cancelled))
.map_err(download_err)?
.download_stream;
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(Body::from_stream(stream))
.map_err(|e| internal_error(e, path, "reading response"))
}
// Best solution for files is multipart upload, but remote_storage doesn't support it,
// so we can either read Bytes in memory and push at once or forward BodyDataStream to
// remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
// guaranteed size() which may produce issues while uploading to s3.
// So, currently we're going with an in-memory copy plus a boundary to prevent uploading
// very large files.
async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
info!(%path, "uploading");
let request_len = bytes.len();
let max_len = state.max_upload_file_limit;
if request_len > max_len {
return Err(bad_request(
anyhow!("File size {request_len} exceeds max {max_len}"),
"uploading",
));
}
let cancel = state.cancel.clone();
let fun = async || {
let stream = bytes_to_stream(bytes.clone());
state
.storage
.upload(stream, request_len, &path, None, &cancel)
.await
};
retry(
fun,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"uploading",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("uploading cancelled")))
.map_err(|e| internal_error(e, path, "reading response"))?;
Ok(ok())
}
async fn delete(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "deleting");
let cancel = state.cancel.clone();
retry(
async || state.storage.delete(&path, &cancel).await,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"deleting",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("deleting cancelled")))
.map_err(|e| internal_error(e, path, "deleting"))?;
Ok(ok())
}
async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
info!(%path, "deleting prefix");
let cancel = state.cancel.clone();
retry(
async || state.storage.delete_prefix(&path, &cancel).await,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"deleting prefix",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("deleting prefix cancelled")))
.map_err(|e| internal_error(e, path, "deleting prefix"))?;
Ok(ok())
}
pub async fn check_storage_permissions(
client: &GenericRemoteStorage,
cancel: CancellationToken,
) -> anyhow::Result<()> {
info!("storage permissions check");
// as_nanos() as multiple instances proxying same bucket may be started at once
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_nanos()
.to_string();
let path = RemotePath::from_string(&format!("write_access_{now}"))?;
info!(%path, "uploading");
let body = now.to_string();
let stream = bytes_to_stream(Bytes::from(body.clone()));
client
.upload(stream, body.len(), &path, None, &cancel)
.await?;
use tokio::io::AsyncReadExt;
info!(%path, "downloading");
let download_opts = DownloadOpts {
kind: remote_storage::DownloadKind::Small,
..Default::default()
};
let mut body_read_buf = Vec::new();
let stream = client
.download(&path, &download_opts, &cancel)
.await?
.download_stream;
tokio_util::io::StreamReader::new(stream)
.read_to_end(&mut body_read_buf)
.await?;
let body_read = String::from_utf8(body_read_buf)?;
if body != body_read {
error!(%body, %body_read, "File contents do not match");
anyhow::bail!("Read back file doesn't match original")
}
info!(%path, "removing");
client.delete(&path, &cancel).await
}
fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
futures::stream::once(futures::future::ready(Ok(bytes)))
}
#[cfg(test)]
mod tests {
use super::*;
use axum::{body::Body, extract::Request, response::Response};
use http_body_util::BodyExt;
use itertools::iproduct;
use std::env::var;
use std::sync::Arc;
use std::time::Duration;
use test_log::test as testlog;
use tower::{Service, util::ServiceExt};
use utils::id::{TenantId, TimelineId};
// see libs/remote_storage/tests/test_real_s3.rs
const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
let cancel = CancellationToken::new();
let (dir, storage) = if var(REAL_S3_ENV).is_err() {
// tests execute in parallel and we need a new directory for each of them
let dir = camino_tempfile::tempdir().unwrap();
let fs =
remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
(Some(dir), GenericRemoteStorage::LocalFs(fs))
} else {
// test_real_s3::create_s3_client is hard to reference, reimplementing here
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
use rand::Rng;
let random = rand::thread_rng().r#gen::<u32>();
let s3_config = remote_storage::S3Config {
bucket_name: var(REAL_S3_BUCKET).unwrap(),
bucket_region: var(REAL_S3_REGION).unwrap(),
prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
endpoint: None,
concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response: None,
upload_storage_class: None,
};
let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
.await
.unwrap();
(None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
};
let proxy = Storage {
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
storage,
cancel: cancel.clone(),
max_upload_file_limit: usize::MAX,
};
check_storage_permissions(&proxy.storage, cancel)
.await
.unwrap();
(proxy, dir)
}
// see libs/utils/src/auth.rs
const TEST_PUB_KEY_ED25519: &[u8] = b"
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
-----END PUBLIC KEY-----
";
const TEST_PRIV_KEY_ED25519: &[u8] = br#"
-----BEGIN PRIVATE KEY-----
MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
-----END PRIVATE KEY-----
"#;
async fn request(req: Request<Body>) -> Response<Body> {
let (proxy, _) = proxy().await;
app(Arc::new(proxy))
.into_service()
.oneshot(req)
.await
.unwrap()
}
#[testlog(tokio::test)]
async fn status() {
let res = Request::builder()
.uri("/status")
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert_eq!(res.status(), StatusCode::OK);
}
fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
iproduct!(
vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
vec!["GET", "PUT", "DELETE"]
)
}
#[testlog(tokio::test)]
async fn no_token() {
for (uri, method) in routes() {
info!(%uri, %method);
let res = Request::builder()
.uri(uri)
.method(method)
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert!(matches!(
res.status(),
StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
));
}
}
#[testlog(tokio::test)]
async fn invalid_token() {
for (uri, method) in routes() {
info!(%uri, %method);
let status = Request::builder()
.uri(uri)
.header("Authorization", "Bearer 123")
.method(method)
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert!(matches!(
status.status(),
StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
));
}
}
const TENANT_ID: TenantId =
TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
const TIMELINE_ID: TimelineId =
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = object_storage::Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
#[testlog(tokio::test)]
async fn unauthorized() {
let (proxy, _) = proxy().await;
let mut app = app(Arc::new(proxy)).into_service();
let token = token();
let args = itertools::iproduct!(
vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
vec![ENDPOINT_ID, "ep-ololo"]
)
.skip(1);
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
info!(%uri, %method, %tenant, %timeline, %endpoint);
let request = Request::builder()
.uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
.method(method)
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let status = ServiceExt::ready(&mut app)
.await
.unwrap()
.call(request)
.await
.unwrap()
.status();
assert_eq!(status, StatusCode::UNAUTHORIZED);
}
}
#[testlog(tokio::test)]
async fn method_not_allowed() {
let token = token();
let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
for (key, method) in iter {
let status = Request::builder()
.uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
.method(method)
.header("Authorization", format!("Bearer {token}"))
.body(Body::empty())
.map(request)
.unwrap()
.await
.status();
assert!(matches!(
status,
StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
));
}
}
async fn requests_chain(
chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
token: impl Fn(&str) -> String,
) {
let (proxy, _) = proxy().await;
let mut app = app(Arc::new(proxy)).into_service();
for (uri, method, body, expected_status, compare_body) in chain {
info!(%uri, %method, %body, %expected_status);
let bearer = format!("Bearer {}", token(&uri));
let request = Request::builder()
.uri(uri)
.method(method)
.header("Authorization", &bearer)
.body(Body::from(body))
.unwrap();
let response = ServiceExt::ready(&mut app)
.await
.unwrap()
.call(request)
.await
.unwrap();
assert_eq!(response.status(), expected_status);
if !compare_body {
continue;
}
let read_body = response.into_body().collect().await.unwrap().to_bytes();
assert_eq!(body, read_body);
}
}
#[testlog(tokio::test)]
async fn metrics() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
let req = vec![
(uri.clone(), "PUT", "body", StatusCode::OK, false),
(uri.clone(), "DELETE", "", StatusCode::OK, false),
];
requests_chain(req.into_iter(), |_| token()).await;
let res = Request::builder()
.uri("/metrics")
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&body);
tracing::debug!(%body);
// Storage metrics are not gathered for LocalFs
if var(REAL_S3_ENV).is_ok() {
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
}
assert!(body.contains("process_threads"));
}
#[testlog(tokio::test)]
async fn insert_retrieve_remove() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
let chain = vec![
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
(uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
(uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
(uri.clone(), "DELETE", "", StatusCode::OK, false),
(uri, "GET", "", StatusCode::NOT_FOUND, false),
];
requests_chain(chain.into_iter(), |_| token()).await;
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
#[derive(Serialize)]
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<object_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
endpoint_id: parts.get(3).map(ToString::to_string),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
// Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
#[testlog(tokio::test)]
async fn delete_prefix() {
let tenant_id =
TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
// Why extra slash in string literals? Axum is weird with URIs:
// /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
// as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
// The cost of removing trailing slash is suprisingly hard:
// * Add tower dependency with NormalizePath layer
// * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
// * Rewrite make_service() -> into_make_service()
// * Rewrite oneshot() (not available for NormalizePath)
// I didn't manage to get it working correctly
let chain = vec![
// create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
(f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
// create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
// create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
(f(t2, ""), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
// create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
(f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
// create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
(f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t2, ""), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
(f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
// delete prefix 1 -> empty
(format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
];
requests_chain(chain.into_iter(), delete_prefix_token).await;
}
}

View File

@@ -1,344 +0,0 @@
use anyhow::Result;
use axum::extract::{FromRequestParts, Path};
use axum::response::{IntoResponse, Response};
use axum::{RequestPartsExt, http::StatusCode, http::request::Parts};
use axum_extra::TypedHeader;
use axum_extra::headers::{Authorization, authorization::Bearer};
use camino::Utf8PathBuf;
use jsonwebtoken::{DecodingKey, Validation};
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::result::Result as StdResult;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use utils::id::{TenantId, TimelineId};
// simplified version of utils::auth::JwtAuth
pub struct JwtAuth {
decoding_key: DecodingKey,
validation: Validation,
}
pub const VALIDATION_ALGO: jsonwebtoken::Algorithm = jsonwebtoken::Algorithm::EdDSA;
impl JwtAuth {
pub fn new(key: &[u8]) -> Result<Self> {
Ok(Self {
decoding_key: DecodingKey::from_ed_pem(key)?,
validation: Validation::new(VALIDATION_ALGO),
})
}
pub fn decode<T: serde::de::DeserializeOwned>(&self, token: &str) -> Result<T> {
Ok(jsonwebtoken::decode(token, &self.decoding_key, &self.validation).map(|t| t.claims)?)
}
}
fn normalize_key(key: &str) -> StdResult<Utf8PathBuf, String> {
let key = clean_utf8(&Utf8PathBuf::from(key));
if key.starts_with("..") || key == "." || key == "/" {
return Err(format!("invalid key {key}"));
}
match key.strip_prefix("/").map(Utf8PathBuf::from) {
Ok(p) => Ok(p),
_ => Ok(key),
}
}
// Copied from path_clean crate with PathBuf->Utf8PathBuf
fn clean_utf8(path: &camino::Utf8Path) -> Utf8PathBuf {
use camino::Utf8Component as Comp;
let mut out = Vec::new();
for comp in path.components() {
match comp {
Comp::CurDir => (),
Comp::ParentDir => match out.last() {
Some(Comp::RootDir) => (),
Some(Comp::Normal(_)) => {
out.pop();
}
None | Some(Comp::CurDir) | Some(Comp::ParentDir) | Some(Comp::Prefix(_)) => {
out.push(comp)
}
},
comp => out.push(comp),
}
}
if !out.is_empty() {
out.iter().collect()
} else {
Utf8PathBuf::from(".")
}
}
pub struct Storage {
pub auth: JwtAuth,
pub storage: GenericRemoteStorage,
pub cancel: CancellationToken,
pub max_upload_file_limit: usize,
}
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
#[derive(Deserialize, Serialize, PartialEq)]
pub struct Claims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
impl Display for Claims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
#[derive(Deserialize, Serialize)]
struct KeyRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
endpoint_id: EndpointId,
path: String,
}
#[derive(Debug, PartialEq)]
pub struct S3Path {
pub path: RemotePath,
}
impl TryFrom<&KeyRequest> for S3Path {
type Error = String;
fn try_from(req: &KeyRequest) -> StdResult<Self, Self::Error> {
let KeyRequest {
tenant_id,
timeline_id,
endpoint_id,
path,
} = &req;
let prefix = format!("{tenant_id}/{timeline_id}/{endpoint_id}",);
let path = Utf8PathBuf::from(prefix).join(normalize_key(path)?);
let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
Ok(S3Path { path })
}
}
fn unauthorized(route: impl Display, claims: impl Display) -> Response {
debug!(%route, %claims, "route doesn't match claims");
StatusCode::UNAUTHORIZED.into_response()
}
pub fn bad_request(err: impl Display, desc: &'static str) -> Response {
debug!(%err, desc);
(StatusCode::BAD_REQUEST, err.to_string()).into_response()
}
pub fn ok() -> Response {
StatusCode::OK.into_response()
}
pub fn internal_error(err: impl Display, path: impl Display, desc: &'static str) -> Response {
error!(%err, %path, desc);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
pub fn not_found(key: impl ToString) -> Response {
(StatusCode::NOT_FOUND, key.to_string()).into_response()
}
impl FromRequestParts<Arc<Storage>> for S3Path {
type Rejection = Response;
async fn from_request_parts(
parts: &mut Parts,
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path): Path<KeyRequest> = parts
.extract()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: Claims = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
let route = Claims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id: path.endpoint_id.clone(),
exp: claims.exp,
};
if route != claims {
return Err(unauthorized(route, claims));
}
(&path)
.try_into()
.map_err(|e| bad_request(e, "invalid route"))
}
}
#[derive(Deserialize, Serialize, PartialEq)]
pub struct PrefixKeyPath {
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub endpoint_id: Option<EndpointId>,
}
impl Display for PrefixKeyPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string())
)
}
}
#[derive(Debug, PartialEq)]
pub struct PrefixS3Path {
pub path: RemotePath,
}
impl From<&PrefixKeyPath> for PrefixS3Path {
fn from(path: &PrefixKeyPath) -> Self {
let timeline_id = path
.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string());
let endpoint_id = path
.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string());
let path = Utf8PathBuf::from(path.tenant_id.to_string())
.join(timeline_id)
.join(endpoint_id);
let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
PrefixS3Path { path }
}
}
impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
type Rejection = Response;
async fn from_request_parts(
parts: &mut Parts,
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path) = parts
.extract::<Path<PrefixKeyPath>>()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: PrefixKeyPath = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "invalid token"))?;
if path != claims {
return Err(unauthorized(path, claims));
}
Ok((&path).into())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_key() {
let f = super::normalize_key;
assert_eq!(f("hello/world/..").unwrap(), Utf8PathBuf::from("hello"));
assert_eq!(
f("ololo/1/../../not_ololo").unwrap(),
Utf8PathBuf::from("not_ololo")
);
assert!(f("ololo/1/../../../").is_err());
assert!(f(".").is_err());
assert!(f("../").is_err());
assert!(f("").is_err());
assert_eq!(f("/1/2/3").unwrap(), Utf8PathBuf::from("1/2/3"));
assert!(f("/1/2/3/../../../").is_err());
assert!(f("/1/2/3/../../../../").is_err());
}
const TENANT_ID: TenantId =
TenantId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
const TIMELINE_ID: TimelineId =
TimelineId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
#[test]
fn s3_path() {
let auth = Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let s3_path = |key| {
let path = &format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/{key}");
let path = RemotePath::from_string(path).unwrap();
S3Path { path }
};
let path = "cache_key".to_string();
let mut key_path = KeyRequest {
path,
tenant_id: auth.tenant_id,
timeline_id: auth.timeline_id,
endpoint_id: auth.endpoint_id,
};
assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
key_path.path = "we/can/have/nested/paths".to_string();
assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
key_path.path = "../error/hello/../".to_string();
assert!(S3Path::try_from(&key_path).is_err());
}
#[test]
fn prefix_s3_path() {
let mut path = PrefixKeyPath {
tenant_id: TENANT_ID,
timeline_id: None,
endpoint_id: None,
};
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}"))
);
path.timeline_id = Some(TIMELINE_ID);
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}"))
);
path.endpoint_id = Some(ENDPOINT_ID.into());
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}"))
);
}
}

View File

@@ -1,65 +0,0 @@
//! `object_storage` is a service which provides API for uploading and downloading
//! files. It is used by compute and control plane for accessing LFC prewarm data.
//! This service is deployed either as a separate component or as part of compute image
//! for large computes.
mod app;
use anyhow::Context;
use tracing::info;
use utils::logging;
//see set()
const fn max_upload_file_limit() -> usize {
100 * 1024 * 1024
}
#[derive(serde::Deserialize)]
#[serde(tag = "type")]
struct Config {
listen: std::net::SocketAddr,
pemfile: camino::Utf8PathBuf,
#[serde(flatten)]
storage_config: remote_storage::RemoteStorageConfig,
#[serde(default = "max_upload_file_limit")]
max_upload_file_limit: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)?;
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: object_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
let config: Config = serde_json::from_str(&config).context("parsing config")?;
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
let auth = object_storage::JwtAuth::new(&pemfile)?;
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?;
let cancel = tokio_util::sync::CancellationToken::new();
app::check_storage_permissions(&storage, cancel.clone()).await?;
let proxy = std::sync::Arc::new(object_storage::Storage {
auth,
storage,
cancel: cancel.clone(),
max_upload_file_limit: config.max_upload_file_limit,
});
tokio::spawn(utils::signals::signal_handler(cancel.clone()));
axum::serve(listener, app::app(proxy))
.with_graceful_shutdown(async move { cancel.cancelled().await })
.await?;
Ok(())
}

View File

@@ -65,7 +65,7 @@ use bytes::{Buf, Bytes};
use criterion::{BenchmarkId, Criterion};
use once_cell::sync::Lazy;
use pageserver::config::PageServerConf;
use pageserver::walredo::{PostgresRedoManager, RedoAttemptType};
use pageserver::walredo::PostgresRedoManager;
use pageserver_api::key::Key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::TenantShardId;
@@ -223,14 +223,7 @@ impl Request {
// TODO: avoid these clones
manager
.request_redo(
*key,
*lsn,
base_img.clone(),
records.clone(),
*pg_version,
RedoAttemptType::ReadPage,
)
.request_redo(*key, *lsn, base_img.clone(), records.clone(), *pg_version)
.await
.context("request_redo")
}

View File

@@ -9,14 +9,14 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, anyhow, bail};
use anyhow::{Context, anyhow};
use camino::Utf8Path;
use clap::{Arg, ArgAction, Command};
use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
use metrics::set_build_info_metric;
use nix::sys::socket::{setsockopt, sockopt};
use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
use pageserver::config::{PageServerConf, PageserverIdentity};
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
use pageserver::deletion_queue::DeletionQueue;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
@@ -31,6 +31,7 @@ use pageserver::{
};
use postgres_backend::AuthType;
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -79,8 +80,6 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
let dev_mode = arg_matches.get_flag("dev");
// Initialize up failpoints support
let scenario = failpoint_support::init();
@@ -99,21 +98,7 @@ fn main() -> anyhow::Result<()> {
env::set_current_dir(&workdir)
.with_context(|| format!("Failed to set application's current dir to '{workdir}'"))?;
let (conf, ignored) = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
if !dev_mode {
if matches!(conf.http_auth_type, AuthType::Trust)
|| matches!(conf.pg_auth_type, AuthType::Trust)
{
bail!(
"Pageserver refuses to start with HTTP or PostgreSQL API authentication disabled.\n\
Run with --dev to allow running without authentication.\n\
This is insecure and should only be used in development environments."
);
}
} else {
warn!("Starting in dev mode: this may be an insecure configuration.");
}
let conf = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
// Initialize logging.
//
@@ -159,17 +144,7 @@ fn main() -> anyhow::Result<()> {
&[("node_id", &conf.id.to_string())],
);
// Warn about ignored config items; see pageserver_api::config::ConfigToml
// doc comment for rationale why we prefer this over serde(deny_unknown_fields).
{
let ignored_fields::Paths { paths } = &ignored;
for path in paths {
warn!(?path, "ignoring unknown configuration item");
}
}
// Log configuration items for feature-flag-like config
// (maybe we should automate this with a visitor?).
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
@@ -232,7 +207,7 @@ fn main() -> anyhow::Result<()> {
tracing::info!("Initializing page_cache...");
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf, ignored, otel_guard).context("Failed to start pageserver")?;
start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
@@ -242,7 +217,7 @@ fn initialize_config(
identity_file_path: &Utf8Path,
cfg_file_path: &Utf8Path,
workdir: &Utf8Path,
) -> anyhow::Result<(&'static PageServerConf, ignored_fields::Paths)> {
) -> anyhow::Result<&'static PageServerConf> {
// The deployment orchestrator writes out an indentity file containing the node id
// for all pageservers. This file is the source of truth for the node id. In order
// to allow for rolling back pageserver releases, the node id is also included in
@@ -271,36 +246,16 @@ fn initialize_config(
let config_file_contents =
std::fs::read_to_string(cfg_file_path).context("read config file from filesystem")?;
let config_toml = serde_path_to_error::deserialize(
toml_edit::de::Deserializer::from_str(&config_file_contents)
.context("build toml deserializer")?,
)
.context("deserialize config toml")?;
// Deserialize the config file contents into a ConfigToml.
let config_toml: pageserver_api::config::ConfigToml = {
let deserializer = toml_edit::de::Deserializer::from_str(&config_file_contents)
.context("build toml deserializer")?;
let mut path_to_error_track = serde_path_to_error::Track::new();
let deserializer =
serde_path_to_error::Deserializer::new(deserializer, &mut path_to_error_track);
serde::Deserialize::deserialize(deserializer).context("deserialize config toml")?
};
// Find unknown fields by re-serializing the parsed ConfigToml and comparing it to the on-disk file.
// Any fields that are only in the on-disk version are unknown.
// (The assumption here is that the ConfigToml doesn't to skip_serializing_if.)
// (Make sure to read the ConfigToml doc comment on why we only want to warn about, but not fail startup, on unknown fields).
let ignored = {
let ondisk_toml = config_file_contents
.parse::<toml_edit::DocumentMut>()
.context("parse original config as toml document")?;
let parsed_toml = toml_edit::ser::to_document(&config_toml)
.context("re-serialize config to toml document")?;
pageserver::config::ignored_fields::find(ondisk_toml, parsed_toml)
};
// Construct the runtime god object (it's called PageServerConf but actually is just global shared state).
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
.context("runtime-validation of config toml")?;
let conf = Box::leak(Box::new(conf));
Ok((conf, ignored))
Ok(Box::leak(Box::new(conf)))
}
struct WaitForPhaseResult<F: std::future::Future + Unpin> {
@@ -351,7 +306,6 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
ignored: ignored_fields::Paths,
otel_guard: Option<OtelGuard>,
) -> anyhow::Result<()> {
// Monotonic time for later calculating startup duration
@@ -375,7 +329,7 @@ fn start_pageserver(
pageserver::metrics::tokio_epoll_uring::Collector::new(),
))
.unwrap();
pageserver::preinitialize_metrics(conf, ignored);
pageserver::preinitialize_metrics(conf);
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
@@ -759,7 +713,32 @@ fn start_pageserver(
let signal_token = CancellationToken::new();
let signal_cancel = signal_token.child_token();
tokio::spawn(utils::signals::signal_handler(signal_token));
// Spawn signal handlers. Runs in a loop since we want to be responsive to multiple signals
// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown). See:
// https://github.com/neondatabase/neon/issues/9740.
tokio::spawn(async move {
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
loop {
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
std::process::exit(111);
}
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
if !signal_token.is_cancelled() {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
signal_token.cancel();
} else {
info!("Got signal {signal}. Already shutting down.");
}
}
});
// Wait for cancellation signal and shut down the pageserver.
//
@@ -832,12 +811,6 @@ fn cli() -> Command {
.action(ArgAction::SetTrue)
.help("Show enabled compile time features"),
)
.arg(
Arg::new("dev")
.long("dev")
.action(ArgAction::SetTrue)
.help("Run in development mode (disables security checks)"),
)
}
#[test]

View File

@@ -4,8 +4,6 @@
//! file, or on the command line.
//! See also `settings.md` for better description on every parameter.
pub mod ignored_fields;
use std::env;
use std::num::NonZeroUsize;
use std::sync::Arc;
@@ -562,6 +560,7 @@ impl PageServerConf {
}
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(deny_unknown_fields)]
pub struct PageserverIdentity {
pub id: NodeId,
}
@@ -633,4 +632,82 @@ mod tests {
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
/// If there's a typo in the pageserver config, we'd rather catch that typo
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
/// made it in the believe that their config change is effective.
///
/// The default in serde is to allow unknown fields, so, we rely
/// on developer+review discipline to add `deny_unknown_fields` when adding
/// new structs to the config, and these tests here as a regression test.
///
/// The alternative to all of this would be to allow unknown fields in the config.
/// To catch them, we could have a config check tool or mgmt API endpoint that
/// compares the effective config with the TOML on disk and makes sure that
/// the on-disk TOML is a strict subset of the effective config.
mod unknown_fields_handling {
macro_rules! test {
($short_name:ident, $input:expr) => {
#[test]
fn $short_name() {
let input = $input;
let err = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(&input)
.expect_err("some_invalid_field is an invalid field");
dbg!(&err);
assert!(err.to_string().contains("some_invalid_field"));
}
};
}
use indoc::indoc;
test!(
toplevel,
indoc! {r#"
some_invalid_field = 23
"#}
);
test!(
toplevel_nested,
indoc! {r#"
[some_invalid_field]
foo = 23
"#}
);
test!(
disk_usage_based_eviction,
indoc! {r#"
[disk_usage_based_eviction]
some_invalid_field = 23
"#}
);
test!(
tenant_config,
indoc! {r#"
[tenant_config]
some_invalid_field = 23
"#}
);
test!(
l0_flush,
indoc! {r#"
[l0_flush]
mode = "direct"
some_invalid_field = 23
"#}
);
// TODO: fix this => https://github.com/neondatabase/neon/issues/8915
// test!(
// remote_storage_config,
// indoc! {r#"
// [remote_storage_config]
// local_path = "/nonexistent"
// some_invalid_field = 23
// "#}
// );
}
}

View File

@@ -1,179 +0,0 @@
//! Check for fields in the on-disk config file that were ignored when
//! deserializing [`pageserver_api::config::ConfigToml`].
//!
//! This could have been part of the [`pageserver_api::config`] module,
//! but the way we identify unused fields in this module
//! is specific to the format (TOML) and the implementation of the
//! deserialization for that format ([`toml_edit`]).
use std::collections::HashSet;
use itertools::Itertools;
/// Pass in the user-specified config and the re-serialized [`pageserver_api::config::ConfigToml`].
/// The returned [`Paths`] contains the paths to the fields that were ignored by deserialization
/// of the [`pageserver_api::config::ConfigToml`].
pub fn find(user_specified: toml_edit::DocumentMut, reserialized: toml_edit::DocumentMut) -> Paths {
let user_specified = paths(user_specified);
let reserialized = paths(reserialized);
fn paths(doc: toml_edit::DocumentMut) -> HashSet<String> {
let mut out = Vec::new();
let mut visitor = PathsVisitor::new(&mut out);
visitor.visit_table_like(doc.as_table());
HashSet::from_iter(out)
}
let mut ignored = HashSet::new();
// O(n) because of HashSet
for path in user_specified {
if !reserialized.contains(&path) {
ignored.insert(path);
}
}
Paths {
paths: ignored
.into_iter()
// sort lexicographically for deterministic output
.sorted()
.collect(),
}
}
pub struct Paths {
pub paths: Vec<String>,
}
struct PathsVisitor<'a> {
stack: Vec<String>,
out: &'a mut Vec<String>,
}
impl<'a> PathsVisitor<'a> {
fn new(out: &'a mut Vec<String>) -> Self {
Self {
stack: Vec::new(),
out,
}
}
fn visit_table_like(&mut self, table_like: &dyn toml_edit::TableLike) {
for (entry, item) in table_like.iter() {
self.stack.push(entry.to_string());
self.visit_item(item);
self.stack.pop();
}
}
fn visit_item(&mut self, item: &toml_edit::Item) {
match item {
toml_edit::Item::None => (),
toml_edit::Item::Value(value) => self.visit_value(value),
toml_edit::Item::Table(table) => {
self.visit_table_like(table);
}
toml_edit::Item::ArrayOfTables(array_of_tables) => {
for (i, table) in array_of_tables.iter().enumerate() {
self.stack.push(format!("[{i}]"));
self.visit_table_like(table);
self.stack.pop();
}
}
}
}
fn visit_value(&mut self, value: &toml_edit::Value) {
match value {
toml_edit::Value::String(_)
| toml_edit::Value::Integer(_)
| toml_edit::Value::Float(_)
| toml_edit::Value::Boolean(_)
| toml_edit::Value::Datetime(_) => self.out.push(self.stack.join(".")),
toml_edit::Value::Array(array) => {
for (i, value) in array.iter().enumerate() {
self.stack.push(format!("[{i}]"));
self.visit_value(value);
self.stack.pop();
}
}
toml_edit::Value::InlineTable(inline_table) => self.visit_table_like(inline_table),
}
}
}
#[cfg(test)]
pub(crate) mod tests {
fn test_impl(original: &str, parsed: &str, expect: [&str; 1]) {
let original: toml_edit::DocumentMut = original.parse().expect("parse original config");
let parsed: toml_edit::DocumentMut = parsed.parse().expect("parse re-serialized config");
let super::Paths { paths: actual } = super::find(original, parsed);
assert_eq!(actual, &expect);
}
#[test]
fn top_level() {
test_impl(
r#"
[a]
b = 1
c = 2
d = 3
"#,
r#"
[a]
b = 1
c = 2
"#,
["a.d"],
);
}
#[test]
fn nested() {
test_impl(
r#"
[a.b.c]
d = 23
"#,
r#"
[a]
e = 42
"#,
["a.b.c.d"],
);
}
#[test]
fn array_of_tables() {
test_impl(
r#"
[[a]]
b = 1
c = 2
d = 3
"#,
r#"
[[a]]
b = 1
c = 2
"#,
["a.[0].d"],
);
}
#[test]
fn array() {
test_impl(
r#"
foo = [ {bar = 23} ]
"#,
r#"
foo = [ { blup = 42 }]
"#,
["foo.[0].bar"],
);
}
}

View File

@@ -89,7 +89,7 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use once_cell::sync::Lazy;
use tracing::warn;
@@ -566,34 +566,6 @@ impl RequestContext {
}
}
pub(crate) fn ondemand_download_wait_observe(&self, duration: Duration) {
if duration == Duration::ZERO {
return;
}
match &self.scope {
Scope::Timeline { arc_arc } => arc_arc
.wait_ondemand_download_time
.observe(self.task_kind, duration),
_ => {
use once_cell::sync::Lazy;
use std::sync::Mutex;
use std::time::Duration;
use utils::rate_limit::RateLimit;
static LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
let mut guard = LIMIT.lock().unwrap();
guard.call2(|rate_limit_stats| {
warn!(
%rate_limit_stats,
backtrace=%std::backtrace::Backtrace::force_capture(),
"ondemand downloads should always happen within timeline scope",
);
});
}
}
}
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
span.inner().follows_from(from_span.inner());

View File

@@ -212,12 +212,6 @@ paths:
schema:
type: string
format: date-time
"412":
description: No timestamp is found for given LSN, e.g. if there had been no commits till LSN
content:
application/json:
schema:
$ref: "#/components/schemas/PreconditionFailedError"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:

View File

@@ -67,7 +67,7 @@ use crate::tenant::mgr::{
};
use crate::tenant::remote_timeline_client::index::GcCompactionState;
use crate::tenant::remote_timeline_client::{
download_index_part, download_tenant_manifest, list_remote_tenant_shards, list_remote_timelines,
download_index_part, list_remote_tenant_shards, list_remote_timelines,
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
@@ -989,7 +989,7 @@ async fn get_lsn_by_timestamp_handler(
if !tenant_shard_id.is_shard_zero() {
// Requires SLRU contents, which are only stored on shard zero
return Err(ApiError::BadRequest(anyhow!(
"Lsn calculations by timestamp are only available on shard zero"
"Size calculations are only available on shard zero"
)));
}
@@ -1064,7 +1064,7 @@ async fn get_timestamp_of_lsn_handler(
if !tenant_shard_id.is_shard_zero() {
// Requires SLRU contents, which are only stored on shard zero
return Err(ApiError::BadRequest(anyhow!(
"Timestamp calculations by lsn are only available on shard zero"
"Size calculations are only available on shard zero"
)));
}
@@ -1090,8 +1090,8 @@ async fn get_timestamp_of_lsn_handler(
.to_string();
json_response(StatusCode::OK, time)
}
None => Err(ApiError::PreconditionFailed(
format!("Timestamp for lsn {} not found", lsn).into(),
None => Err(ApiError::NotFound(
anyhow::anyhow!("Timestamp for lsn {} not found", lsn).into(),
)),
}
}
@@ -2274,7 +2274,6 @@ async fn timeline_compact_handler(
if Some(true) == parse_query_param::<_, bool>(&request, "dry_run")? {
flags |= CompactFlags::DryRun;
}
// Manual compaction does not yield for L0.
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
@@ -2912,22 +2911,9 @@ async fn tenant_scan_remote_handler(
};
}
let result =
download_tenant_manifest(&state.remote_storage, &tenant_shard_id, generation, &cancel)
.instrument(info_span!("download_tenant_manifest",
tenant_id=%tenant_shard_id.tenant_id,
shard_id=%tenant_shard_id.shard_slug()))
.await;
let stripe_size = match result {
Ok((manifest, _, _)) => manifest.stripe_size,
Err(DownloadError::NotFound) => None,
Err(err) => return Err(ApiError::InternalServerError(anyhow!(err))),
};
response.shards.push(TenantScanRemoteStorageShard {
tenant_shard_id,
generation: generation.into(),
stripe_size,
});
}
@@ -3382,11 +3368,11 @@ async fn put_tenant_timeline_import_basebackup(
let broker_client = state.broker_client.clone();
let mut body = StreamReader::new(
request
.into_body()
.map(|res| res.map_err(|error| std::io::Error::other(anyhow::anyhow!(error)))),
);
let mut body = StreamReader::new(request.into_body().map(|res| {
res.map_err(|error| {
std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error))
})
}));
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
@@ -3460,7 +3446,7 @@ async fn put_tenant_timeline_import_wal(
let mut body = StreamReader::new(request.into_body().map(|res| {
res.map_err(|error| {
std::io::Error::other( anyhow::anyhow!(error))
std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error))
})
}));

View File

@@ -1,8 +1,10 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use enum_map::{Enum as _, EnumMap};
@@ -21,13 +23,13 @@ use pageserver_api::config::{
};
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use pin_project_lite::pin_project;
use postgres_backend::{QueryError, is_expected_io_error};
use pq_proto::framed::ConnectionError;
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use utils::id::TimelineId;
use crate::config;
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext};
use crate::pgdatadir_mapping::DatadirModificationStats;
@@ -497,100 +499,6 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) mod wait_ondemand_download_time {
use super::*;
const WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS: &[f64] = &[
0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, // 10 ms - 100ms
0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, // 100ms to 1s
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, // 1s to 10s
10.0, 20.0, 30.0, 40.0, 50.0, 60.0, // 10s to 1m
];
/// The task kinds for which we want to track wait times for on-demand downloads.
/// Other task kinds' wait times are accumulated in label value `unknown`.
pub(crate) const WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS: [TaskKind; 2] = [
TaskKind::PageRequestHandler,
TaskKind::WalReceiverConnectionHandler,
];
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL: Lazy<Vec<Histogram>> = Lazy::new(|| {
let histo = register_histogram_vec!(
"pageserver_wait_ondemand_download_seconds_global",
"Observations are individual tasks' wait times for on-demand downloads. \
If N tasks coalesce on an on-demand download, and it takes 10s, than we observe N * 10s.",
&["task_kind"],
WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS.into(),
)
.expect("failed to define a metric");
WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.map(|task_kind| histo.with_label_values(&[task_kind.into()]))
.collect::<Vec<_>>()
});
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_SUM: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
// use a name that _could_ be evolved into a per-timeline histogram later
"pageserver_wait_ondemand_download_seconds_sum",
"Like `pageserver_wait_ondemand_download_seconds_global` but per timeline",
&["tenant_id", "shard_id", "timeline_id", "task_kind"],
)
.unwrap()
});
pub struct WaitOndemandDownloadTimeSum {
counters: [Counter; WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS.len()],
}
impl WaitOndemandDownloadTimeSum {
pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self {
let counters = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.map(|task_kind| {
WAIT_ONDEMAND_DOWNLOAD_TIME_SUM
.get_metric_with_label_values(&[
tenant_id,
shard_id,
timeline_id,
task_kind.into(),
])
.unwrap()
})
.collect::<Vec<_>>();
Self {
counters: counters.try_into().unwrap(),
}
}
pub(crate) fn observe(&self, task_kind: TaskKind, duration: Duration) {
let maybe = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
.iter()
.enumerate()
.find(|(_, kind)| **kind == task_kind);
let Some((idx, _)) = maybe else {
return;
};
WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL[idx].observe(duration.as_secs_f64());
let counter = &self.counters[idx];
counter.inc_by(duration.as_secs_f64());
}
}
pub(crate) fn shutdown_timeline(tenant_id: &str, shard_id: &str, timeline_id: &str) {
for task_kind in WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS {
let _ = WAIT_ONDEMAND_DOWNLOAD_TIME_SUM.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
task_kind.into(),
]);
}
}
pub(crate) fn preinitialize_global_metrics() {
Lazy::force(&WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL);
}
}
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
@@ -2406,18 +2314,13 @@ impl RemoteOpFileKind {
}
}
pub(crate) static REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_remote_timeline_client_seconds_global",
"Time spent on remote timeline client operations. \
Grouped by task_kind, file_kind, operation_kind and status. \
The task_kind is \
- for layer downloads, populated from RequestContext (primary objective of having the label) \
- for index downloads, set to 'unknown' \
- for any upload operation, set to 'RemoteUploadTask' \
This keeps dimensionality at bay. \
"pageserver_remote_operation_seconds",
"Time spent on remote storage operations. \
Grouped by tenant, timeline, operation_kind and status. \
Does not account for time spent waiting in remote timeline client's queues.",
&["task_kind", "file_kind", "op_kind", "status"]
&["file_kind", "op_kind", "status"]
)
.expect("failed to define a metric")
});
@@ -2979,7 +2882,6 @@ pub(crate) struct TimelineMetrics {
pub storage_io_size: StorageIoSizeMetrics,
pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter,
pub wait_lsn_start_finish_counterpair: IntCounterPair,
pub wait_ondemand_download_time: wait_ondemand_download_time::WaitOndemandDownloadTimeSum,
shutdown: std::sync::atomic::AtomicBool,
}
@@ -3125,13 +3027,6 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let wait_ondemand_download_time =
wait_ondemand_download_time::WaitOndemandDownloadTimeSum::new(
&tenant_id,
&shard_id,
&timeline_id,
);
TimelineMetrics {
tenant_id,
shard_id,
@@ -3165,7 +3060,6 @@ impl TimelineMetrics {
wal_records_received,
wait_lsn_in_progress_micros,
wait_lsn_start_finish_counterpair,
wait_ondemand_download_time,
shutdown: std::sync::atomic::AtomicBool::default(),
}
}
@@ -3358,8 +3252,6 @@ impl TimelineMetrics {
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id]);
}
wait_ondemand_download_time::shutdown_timeline(tenant_id, shard_id, timeline_id);
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
tenant_id,
@@ -3481,18 +3373,13 @@ impl RemoteTimelineClientMetrics {
pub fn remote_operation_time(
&self,
task_kind: Option<TaskKind>,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
status: &'static str,
) -> Histogram {
REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY
.get_metric_with_label_values(&[
task_kind.as_ref().map(|tk| tk.into()).unwrap_or("unknown"),
file_kind.as_str(),
op_kind.as_str(),
status,
])
let key = (file_kind.as_str(), op_kind.as_str(), status);
REMOTE_OPERATION_TIME
.get_metric_with_label_values(&[key.0, key.1, key.2])
.unwrap()
}
@@ -3737,26 +3624,54 @@ impl Drop for RemoteTimelineClientMetrics {
/// Wrapper future that measures the time spent by a remote storage operation,
/// and records the time and success/failure as a prometheus metric.
pub(crate) trait MeasureRemoteOp<O, E>: Sized + Future<Output = Result<O, E>> {
async fn measure_remote_op(
pub(crate) trait MeasureRemoteOp: Sized {
fn measure_remote_op(
self,
task_kind: Option<TaskKind>, // not all caller contexts have a RequestContext / TaskKind handy
file_kind: RemoteOpFileKind,
op: RemoteOpKind,
metrics: Arc<RemoteTimelineClientMetrics>,
) -> Result<O, E> {
) -> MeasuredRemoteOp<Self> {
let start = Instant::now();
let res = self.await;
let duration = start.elapsed();
let status = if res.is_ok() { &"success" } else { &"failure" };
metrics
.remote_operation_time(task_kind, &file_kind, &op, status)
.observe(duration.as_secs_f64());
res
MeasuredRemoteOp {
inner: self,
file_kind,
op,
start,
metrics,
}
}
}
impl<Fut, O, E> MeasureRemoteOp<O, E> for Fut where Fut: Sized + Future<Output = Result<O, E>> {}
impl<T: Sized> MeasureRemoteOp for T {}
pin_project! {
pub(crate) struct MeasuredRemoteOp<F>
{
#[pin]
inner: F,
file_kind: RemoteOpFileKind,
op: RemoteOpKind,
start: Instant,
metrics: Arc<RemoteTimelineClientMetrics>,
}
}
impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
type Output = Result<O, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let poll_result = this.inner.poll(cx);
if let Poll::Ready(ref res) = poll_result {
let duration = this.start.elapsed();
let status = if res.is_ok() { &"success" } else { &"failure" };
this.metrics
.remote_operation_time(this.file_kind, this.op, status)
.observe(duration.as_secs_f64());
}
poll_result
}
}
pub mod tokio_epoll_uring {
use std::collections::HashMap;
@@ -4192,33 +4107,9 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
.set(u64::try_from(num_threads.get()).unwrap());
}
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_config_ignored_items",
"TOML items present in the on-disk configuration file but ignored by the pageserver config parser.\
The `item` label is the dot-separated path of the ignored item in the on-disk configuration file.\
The value for an unknown config item is always 1.\
There is a special label value \"\", which is 0, so that there is always a metric exposed (simplifies dashboards).",
&["item"]
)
.unwrap()
});
pub fn preinitialize_metrics(
conf: &'static PageServerConf,
ignored: config::ignored_fields::Paths,
) {
pub fn preinitialize_metrics(conf: &'static PageServerConf) {
set_page_service_config_max_batch_size(&conf.page_service_pipelining);
PAGESERVER_CONFIG_IGNORED_ITEMS
.with_label_values(&[""])
.set(0);
for path in &ignored.paths {
PAGESERVER_CONFIG_IGNORED_ITEMS
.with_label_values(&[path])
.set(1);
}
// Python tests need these and on some we do alerting.
//
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
@@ -4304,5 +4195,4 @@ pub fn preinitialize_metrics(
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);
tenant_throttling::preinitialize_global_metrics();
wait_ondemand_download_time::preinitialize_global_metrics();
}

View File

@@ -247,15 +247,6 @@ pub async fn libpq_listener_main(
type ConnectionHandlerResult = anyhow::Result<()>;
/// Perf root spans start at the per-request level, after shard routing.
/// This struct carries connection-level information to the root perf span definition.
#[derive(Clone)]
struct ConnectionPerfSpanFields {
peer_addr: String,
application_name: Option<String>,
compute_mode: Option<String>,
}
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
@@ -280,12 +271,6 @@ async fn page_service_conn_main(
let socket_fd = socket.as_raw_fd();
let peer_addr = socket.peer_addr().context("get peer address")?;
let perf_span_fields = ConnectionPerfSpanFields {
peer_addr: peer_addr.to_string(),
application_name: None, // filled in later
compute_mode: None, // filled in later
};
tracing::Span::current().record("peer_addr", field::display(peer_addr));
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
@@ -329,7 +314,6 @@ async fn page_service_conn_main(
tenant_manager,
auth,
pipelining_config,
perf_span_fields,
connection_ctx,
cancel.clone(),
gate_guard,
@@ -374,8 +358,6 @@ struct PageServerHandler {
/// `process_query` creates a child context from this one.
connection_ctx: RequestContext,
perf_span_fields: ConnectionPerfSpanFields,
cancel: CancellationToken,
/// None only while pagestream protocol is being processed.
@@ -721,13 +703,11 @@ impl BatchedFeMessage {
}
impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
perf_span_fields: ConnectionPerfSpanFields,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -737,7 +717,6 @@ impl PageServerHandler {
auth,
claims: None,
connection_ctx,
perf_span_fields,
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
@@ -775,7 +754,6 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_handles: &mut TimelineHandles,
conn_perf_span_fields: &ConnectionPerfSpanFields,
cancel: &CancellationToken,
ctx: &RequestContext,
protocol_version: PagestreamProtocolVersion,
@@ -974,9 +952,6 @@ impl PageServerHandler {
info_span!(
target: PERF_TRACE_TARGET,
"GET_PAGE",
peer_addr = conn_perf_span_fields.peer_addr,
application_name = conn_perf_span_fields.application_name,
compute_mode = conn_perf_span_fields.compute_mode,
tenant_id = %tenant_id,
shard_id = %shard.get_shard_identity().shard_slug(),
timeline_id = %timeline_id,
@@ -1606,7 +1581,6 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
&self.perf_span_fields,
&cancel,
ctx,
protocol_version,
@@ -1740,8 +1714,6 @@ impl PageServerHandler {
// Batcher
//
let perf_span_fields = self.perf_span_fields.clone();
let cancel_batcher = self.cancel.child_token();
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
@@ -1755,7 +1727,6 @@ impl PageServerHandler {
tenant_id,
timeline_id,
&mut timeline_handles,
&perf_span_fields,
&cancel_batcher,
&ctx,
protocol_version,
@@ -2698,14 +2669,12 @@ where
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {
self.perf_span_fields.application_name = Some(app_name.to_string());
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
let (config, _) = parse_options(options);
for (key, value) in config {
if key == "neon.compute_mode" {
self.perf_span_fields.compute_mode = Some(value.clone());
Span::current().record("compute_mode", field::display(value));
}
}

View File

@@ -691,7 +691,7 @@ impl Timeline {
Ok(buf.get_u32_le())
}
/// Does the slru segment exist?
/// Get size of an SLRU segment
pub(crate) async fn get_slru_segment_exists(
&self,
kind: SlruKind,
@@ -844,9 +844,9 @@ impl Timeline {
.await
}
/// Obtain the timestamp for the given lsn.
/// Obtain the possible timestamp range for the given lsn.
///
/// If the lsn has no timestamps (e.g. no commits), returns None.
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
pub(crate) async fn get_timestamp_for_lsn(
&self,
probe_lsn: Lsn,

View File

@@ -45,7 +45,6 @@ use remote_timeline_client::manifest::{
};
use remote_timeline_client::{
FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD, UploadQueueNotReadyError,
download_tenant_manifest,
};
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
@@ -100,7 +99,7 @@ use crate::tenant::timeline::delete::DeleteTimelineFlow;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
use crate::walingest::WalLagCooldown;
use crate::walredo::{PostgresRedoManager, RedoAttemptType};
use crate::walredo::PostgresRedoManager;
use crate::{InitializationOrder, TEMP_FILE_SUFFIX, import_datadir, span, task_mgr, walredo};
static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
@@ -227,8 +226,7 @@ struct TimelinePreload {
}
pub(crate) struct TenantPreload {
/// The tenant manifest from remote storage, or None if no manifest was found.
tenant_manifest: Option<TenantManifest>,
tenant_manifest: TenantManifest,
/// Map from timeline ID to a possible timeline preload. It is None iff the timeline is offloaded according to the manifest.
timelines: HashMap<TimelineId, Option<TimelinePreload>>,
}
@@ -284,15 +282,12 @@ pub struct Tenant {
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
timelines_offloaded: Mutex<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
/// The last tenant manifest known to be in remote storage. None if the manifest has not yet
/// been either downloaded or uploaded. Always Some after tenant attach.
/// Serialize writes of the tenant manifest to remote storage. If there are concurrent operations
/// affecting the manifest, such as timeline deletion and timeline offload, they must wait for
/// each other (this could be optimized to coalesce writes if necessary).
///
/// Initially populated during tenant attach, updated via `maybe_upload_tenant_manifest`.
///
/// Do not modify this directly. It is used to check whether a new manifest needs to be
/// uploaded. The manifest is constructed in `build_tenant_manifest`, and uploaded via
/// `maybe_upload_tenant_manifest`.
remote_tenant_manifest: tokio::sync::Mutex<Option<TenantManifest>>,
/// The contents of the Mutex are the last manifest we successfully uploaded
tenant_manifest_upload: tokio::sync::Mutex<Option<TenantManifest>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
@@ -473,16 +468,15 @@ impl WalRedoManager {
base_img: Option<(Lsn, bytes::Bytes)>,
records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>,
pg_version: u32,
redo_attempt_type: RedoAttemptType,
) -> Result<bytes::Bytes, walredo::Error> {
match self {
Self::Prod(_, mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type)
mgr.request_redo(key, lsn, base_img, records, pg_version)
.await
}
#[cfg(test)]
Self::Test(mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type)
mgr.request_redo(key, lsn, base_img, records, pg_version)
.await
}
}
@@ -921,7 +915,6 @@ enum StartCreatingTimelineResult {
Idempotent(Arc<Timeline>),
}
#[allow(clippy::large_enum_variant, reason = "TODO")]
enum TimelineInitAndSyncResult {
ReadyToActivate(Arc<Timeline>),
NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata),
@@ -1008,7 +1001,6 @@ enum CreateTimelineCause {
Delete,
}
#[allow(clippy::large_enum_variant, reason = "TODO")]
enum LoadTimelineCause {
Attach,
Unoffload,
@@ -1362,41 +1354,36 @@ impl Tenant {
}
}
fn make_broken_or_stopping(t: &Tenant, err: anyhow::Error) {
t.state.send_modify(|state| match state {
// TODO: the old code alluded to DeleteTenantFlow sometimes setting
// TenantState::Stopping before we get here, but this may be outdated.
// Let's find out with a testing assertion. If this doesn't fire, and the
// logs don't show this happening in production, remove the Stopping cases.
TenantState::Stopping{..} if cfg!(any(test, feature = "testing")) => {
panic!("unexpected TenantState::Stopping during attach")
}
// If the tenant is cancelled, assume the error was caused by cancellation.
TenantState::Attaching if t.cancel.is_cancelled() => {
info!("attach cancelled, setting tenant state to Stopping: {err}");
// NB: progress None tells `set_stopping` that attach has cancelled.
*state = TenantState::Stopping { progress: None };
}
// According to the old code, DeleteTenantFlow may already have set this to
// Stopping. Retain its progress.
// TODO: there is no DeleteTenantFlow. Is this still needed? See above.
TenantState::Stopping { progress } if t.cancel.is_cancelled() => {
assert!(progress.is_some(), "concurrent attach cancellation");
info!("attach cancelled, already Stopping: {err}");
}
// Mark the tenant as broken.
TenantState::Attaching | TenantState::Stopping { .. } => {
error!("attach failed, setting tenant state to Broken (was {state}): {err:?}");
*state = TenantState::broken_from_reason(err.to_string())
}
// The attach task owns the tenant state until activated.
state => panic!("invalid tenant state {state} during attach: {err:?}"),
});
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
enum BrokenVerbosity {
Error,
Info
}
let make_broken =
|t: &Tenant, err: anyhow::Error, verbosity: BrokenVerbosity| {
match verbosity {
BrokenVerbosity::Info => {
info!("attach cancelled, setting tenant state to Broken: {err}");
},
BrokenVerbosity::Error => {
error!("attach failed, setting tenant state to Broken: {err:?}");
}
}
t.state.send_modify(|state| {
// The Stopping case is for when we have passed control on to DeleteTenantFlow:
// if it errors, we will call make_broken when tenant is already in Stopping.
assert!(
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
"the attach task owns the tenant state until activation is complete"
);
*state = TenantState::broken_from_reason(err.to_string());
});
};
// TODO: should also be rejecting tenant conf changes that violate this check.
if let Err(e) = crate::tenant::storage_layer::inmemory_layer::IndexEntry::validate_checkpoint_distance(tenant_clone.get_checkpoint_distance()) {
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e));
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
return Ok(());
}
@@ -1448,8 +1435,10 @@ impl Tenant {
// stayed in Activating for such a long time that shutdown found it in
// that state.
tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
// Set the tenant to Stopping to signal `set_stopping` that we're done.
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
// Make the tenant broken so that set_stopping will not hang waiting for it to leave
// the Attaching state. This is an over-reaction (nothing really broke, the tenant is
// just shutting down), but ensures progress.
make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"), BrokenVerbosity::Info);
return Ok(());
},
)
@@ -1468,7 +1457,7 @@ impl Tenant {
match res {
Ok(p) => Some(p),
Err(e) => {
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e));
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
return Ok(());
}
}
@@ -1494,7 +1483,9 @@ impl Tenant {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
}
Err(e) => make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e)),
Err(e) => {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
}
}
// If we are doing an opportunistic warmup attachment at startup, initialize
@@ -1534,27 +1525,28 @@ impl Tenant {
cancel.clone(),
)
.await?;
let tenant_manifest = match download_tenant_manifest(
remote_storage,
&self.tenant_shard_id,
self.generation,
&cancel,
)
.await
{
Ok((tenant_manifest, _, _)) => Some(tenant_manifest),
Err(DownloadError::NotFound) => None,
Err(err) => return Err(err.into()),
};
let (offloaded_add, tenant_manifest) =
match remote_timeline_client::download_tenant_manifest(
remote_storage,
&self.tenant_shard_id,
self.generation,
&cancel,
)
.await
{
Ok((tenant_manifest, _generation, _manifest_mtime)) => (
format!("{} offloaded", tenant_manifest.offloaded_timelines.len()),
tenant_manifest,
),
Err(DownloadError::NotFound) => {
("no manifest".to_string(), TenantManifest::empty())
}
Err(e) => Err(e)?,
};
info!(
"found {} timelines ({} offloaded timelines)",
remote_timeline_ids.len(),
tenant_manifest
.as_ref()
.map(|m| m.offloaded_timelines.len())
.unwrap_or(0)
"found {} timelines, and {offloaded_add}",
remote_timeline_ids.len()
);
for k in other_keys {
@@ -1563,13 +1555,11 @@ impl Tenant {
// Avoid downloading IndexPart of offloaded timelines.
let mut offloaded_with_prefix = HashSet::new();
if let Some(tenant_manifest) = &tenant_manifest {
for offloaded in tenant_manifest.offloaded_timelines.iter() {
if remote_timeline_ids.remove(&offloaded.timeline_id) {
offloaded_with_prefix.insert(offloaded.timeline_id);
} else {
// We'll take care later of timelines in the manifest without a prefix
}
for offloaded in tenant_manifest.offloaded_timelines.iter() {
if remote_timeline_ids.remove(&offloaded.timeline_id) {
offloaded_with_prefix.insert(offloaded.timeline_id);
} else {
// We'll take care later of timelines in the manifest without a prefix
}
}
@@ -1643,14 +1633,12 @@ impl Tenant {
let mut offloaded_timeline_ids = HashSet::new();
let mut offloaded_timelines_list = Vec::new();
if let Some(tenant_manifest) = &preload.tenant_manifest {
for timeline_manifest in tenant_manifest.offloaded_timelines.iter() {
let timeline_id = timeline_manifest.timeline_id;
let offloaded_timeline =
OffloadedTimeline::from_manifest(self.tenant_shard_id, timeline_manifest);
offloaded_timelines_list.push((timeline_id, Arc::new(offloaded_timeline)));
offloaded_timeline_ids.insert(timeline_id);
}
for timeline_manifest in preload.tenant_manifest.offloaded_timelines.iter() {
let timeline_id = timeline_manifest.timeline_id;
let offloaded_timeline =
OffloadedTimeline::from_manifest(self.tenant_shard_id, timeline_manifest);
offloaded_timelines_list.push((timeline_id, Arc::new(offloaded_timeline)));
offloaded_timeline_ids.insert(timeline_id);
}
// Complete deletions for offloaded timeline id's from manifest.
// The manifest will be uploaded later in this function.
@@ -1808,21 +1796,15 @@ impl Tenant {
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
let needs_manifest_upload =
offloaded_timelines_list.len() != preload.tenant_manifest.offloaded_timelines.len();
{
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
}
// Stash the preloaded tenant manifest, and upload a new manifest if changed.
//
// NB: this must happen after the tenant is fully populated above. In particular the
// offloaded timelines, which are included in the manifest.
{
let mut guard = self.remote_tenant_manifest.lock().await;
assert!(guard.is_none(), "tenant manifest set before preload"); // first populated here
*guard = preload.tenant_manifest;
if needs_manifest_upload {
self.store_tenant_manifest().await?;
}
self.maybe_upload_tenant_manifest().await?;
// The local filesystem contents are a cache of what's in the remote IndexPart;
// IndexPart is the source of truth.
@@ -2236,7 +2218,7 @@ impl Tenant {
};
// Upload new list of offloaded timelines to S3
self.maybe_upload_tenant_manifest().await?;
self.store_tenant_manifest().await?;
// Activate the timeline (if it makes sense)
if !(timeline.is_broken() || timeline.is_stopping()) {
@@ -3447,7 +3429,7 @@ impl Tenant {
shutdown_mode
};
match self.set_stopping(shutdown_progress).await {
match self.set_stopping(shutdown_progress, false, false).await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
// assume that this is acceptable
@@ -3527,13 +3509,25 @@ impl Tenant {
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> {
///
/// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
/// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
async fn set_stopping(
&self,
progress: completion::Barrier,
_allow_transition_from_loading: bool,
allow_transition_from_attaching: bool,
) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Attaching if allow_transition_from_attaching => true,
TenantState::Activating(_) | TenantState::Attaching => {
info!("waiting for {state} to turn Active|Broken|Stopping");
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
@@ -3544,24 +3538,25 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
TenantState::Activating(_) => {
unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Attaching => {
if !allow_transition_from_attaching {
unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
};
*current_state = TenantState::Stopping { progress };
true
}
TenantState::Active => {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
*current_state = TenantState::Stopping { progress: Some(progress) };
*current_state = TenantState::Stopping { progress };
// Continue stopping outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
true
}
TenantState::Stopping { progress: None } => {
// An attach was cancelled, and the attach transitioned the tenant from Attaching to
// Stopping(None) to let us know it exited. Register our progress and continue.
*current_state = TenantState::Stopping { progress: Some(progress) };
true
}
TenantState::Broken { reason, .. } => {
info!(
"Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
@@ -3569,7 +3564,7 @@ impl Tenant {
err = Some(SetStoppingError::Broken);
false
}
TenantState::Stopping { progress: Some(progress) } => {
TenantState::Stopping { progress } => {
info!("Tenant is already in Stopping state");
err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
false
@@ -4070,20 +4065,18 @@ impl Tenant {
/// Generate an up-to-date TenantManifest based on the state of this Tenant.
fn build_tenant_manifest(&self) -> TenantManifest {
// Collect the offloaded timelines, and sort them for deterministic output.
let offloaded_timelines = self
.timelines_offloaded
.lock()
.unwrap()
.values()
.map(|tli| tli.manifest())
.sorted_by_key(|m| m.timeline_id)
.collect_vec();
let timelines_offloaded = self.timelines_offloaded.lock().unwrap();
let mut timeline_manifests = timelines_offloaded
.iter()
.map(|(_timeline_id, offloaded)| offloaded.manifest())
.collect::<Vec<_>>();
// Sort the manifests so that our output is deterministic
timeline_manifests.sort_by_key(|timeline_manifest| timeline_manifest.timeline_id);
TenantManifest {
version: LATEST_TENANT_MANIFEST_VERSION,
stripe_size: Some(self.get_shard_stripe_size()),
offloaded_timelines,
offloaded_timelines: timeline_manifests,
}
}
@@ -4306,7 +4299,7 @@ impl Tenant {
timelines: Mutex::new(HashMap::new()),
timelines_creating: Mutex::new(HashSet::new()),
timelines_offloaded: Mutex::new(HashMap::new()),
remote_tenant_manifest: Default::default(),
tenant_manifest_upload: Default::default(),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
@@ -4402,7 +4395,10 @@ impl Tenant {
.to_string();
fail::fail_point!("tenant-config-before-write", |_| {
Err(std::io::Error::other("tenant-config-before-write"))
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"tenant-config-before-write",
))
});
// Convert the config to a toml file.
@@ -5536,35 +5532,27 @@ impl Tenant {
.unwrap_or(0)
}
/// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant
/// manifest in `Self::remote_tenant_manifest`.
///
/// TODO: instead of requiring callers to remember to call `maybe_upload_tenant_manifest` after
/// changing any `Tenant` state that's included in the manifest, consider making the manifest
/// the authoritative source of data with an API that automatically uploads on changes. Revisit
/// this when the manifest is more widely used and we have a better idea of the data model.
pub(crate) async fn maybe_upload_tenant_manifest(&self) -> Result<(), TenantManifestError> {
// Multiple tasks may call this function concurrently after mutating the Tenant runtime
// state, affecting the manifest generated by `build_tenant_manifest`. We use an async mutex
// to serialize these callers. `eq_ignoring_version` acts as a slightly inefficient but
// simple coalescing mechanism.
/// Serialize and write the latest TenantManifest to remote storage.
pub(crate) async fn store_tenant_manifest(&self) -> Result<(), TenantManifestError> {
// Only one manifest write may be done at at time, and the contents of the manifest
// must be loaded while holding this lock. This makes it safe to call this function
// from anywhere without worrying about colliding updates.
let mut guard = tokio::select! {
guard = self.remote_tenant_manifest.lock() => guard,
_ = self.cancel.cancelled() => return Err(TenantManifestError::Cancelled),
g = self.tenant_manifest_upload.lock() => {
g
},
_ = self.cancel.cancelled() => {
return Err(TenantManifestError::Cancelled);
}
};
// Build a new manifest.
let manifest = self.build_tenant_manifest();
// Check if the manifest has changed. We ignore the version number here, to avoid
// uploading every manifest on version number bumps.
if let Some(old) = guard.as_ref() {
if manifest.eq_ignoring_version(old) {
return Ok(());
}
if Some(&manifest) == (*guard).as_ref() {
// Optimisation: skip uploads that don't change anything.
return Ok(());
}
// Upload the manifest. Remote storage does no retries internally, so retry here.
// Remote storage does no retries internally, so wrap it
match backoff::retry(
|| async {
upload_tenant_manifest(
@@ -5576,7 +5564,7 @@ impl Tenant {
)
.await
},
|_| self.cancel.is_cancelled(),
|_e| self.cancel.is_cancelled(),
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"uploading tenant manifest",
@@ -5880,7 +5868,6 @@ pub(crate) mod harness {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
_redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, walredo::Error> {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
if records_neon {
@@ -8735,21 +8722,6 @@ mod tests {
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init("i")),
),
(
get_key(4),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append_conditional("j", "i")),
),
(
get_key(5),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init("1")),
),
(
get_key(5),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append_conditional("j", "2")),
),
];
let image1 = vec![(get_key(1), "0x10".into())];
@@ -8780,18 +8752,8 @@ mod tests {
// Need to remove the limit of "Neon WAL redo requires base image".
assert_eq!(
tline.get(get_key(3), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"c")
);
assert_eq!(
tline.get(get_key(4), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"ij")
);
// Manual testing required: currently, read errors will panic the process in debug mode. So we
// cannot enable this assertion in the unit test.
// assert!(tline.get(get_key(5), Lsn(0x50), &ctx).await.is_err());
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
Ok(())
}

View File

@@ -15,7 +15,7 @@
//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use std::cmp::min;
use std::io::Error;
use std::io::{Error, ErrorKind};
use async_compression::Level;
use bytes::{BufMut, BytesMut};
@@ -331,7 +331,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
return (
(
io_buf.slice_len(),
Err(Error::other(format!("blob too large ({len} bytes)"))),
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
),
srcbuf,
);

View File

@@ -216,8 +216,12 @@ impl<'a> FileBlockReader<'a> {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| std::io::Error::other(format!("Failed to read immutable buf: {e:#}")))?
{
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer

View File

@@ -642,7 +642,6 @@ impl RemoteTimelineClient {
cancel,
)
.measure_remote_op(
Option::<TaskKind>::None,
RemoteOpFileKind::Index,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
@@ -740,7 +739,6 @@ impl RemoteTimelineClient {
ctx,
)
.measure_remote_op(
Some(ctx.task_kind()),
RemoteOpFileKind::Layer,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
@@ -2177,7 +2175,6 @@ impl RemoteTimelineClient {
&self.cancel,
)
.measure_remote_op(
Some(TaskKind::RemoteUploadTask),
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
Arc::clone(&self.metrics),
@@ -2194,7 +2191,6 @@ impl RemoteTimelineClient {
&self.cancel,
)
.measure_remote_op(
Some(TaskKind::RemoteUploadTask),
RemoteOpFileKind::Index,
RemoteOpKind::Upload,
Arc::clone(&self.metrics),

View File

@@ -1,33 +1,21 @@
use chrono::NaiveDateTime;
use pageserver_api::shard::ShardStripeSize;
use serde::{Deserialize, Serialize};
use utils::id::TimelineId;
use utils::lsn::Lsn;
/// Tenant shard manifest, stored in remote storage. Contains offloaded timelines and other tenant
/// shard-wide information that must be persisted in remote storage.
///
/// The manifest is always updated on tenant attach, and as needed.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
/// Tenant-shard scoped manifest
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TenantManifest {
/// The manifest version. Incremented on manifest format changes, even non-breaking ones.
/// Manifests must generally always be backwards and forwards compatible for one release, to
/// allow release rollbacks.
/// Debugging aid describing the version of this manifest.
/// Can also be used for distinguishing breaking changes later on.
pub version: usize,
/// This tenant's stripe size. This is only advisory, and used to recover tenant data from
/// remote storage. The autoritative source is the storage controller. If None, assume the
/// original default value of 32768 blocks (256 MB).
#[serde(skip_serializing_if = "Option::is_none")]
pub stripe_size: Option<ShardStripeSize>,
/// The list of offloaded timelines together with enough information
/// to not have to actually load them.
///
/// Note: the timelines mentioned in this list might be deleted, i.e.
/// we don't hold an invariant that the references aren't dangling.
/// Existence of index-part.json is the actual indicator of timeline existence.
#[serde(default)]
pub offloaded_timelines: Vec<OffloadedTimelineManifest>,
}
@@ -36,7 +24,7 @@ pub struct TenantManifest {
/// Very similar to [`pageserver_api::models::OffloadedTimelineInfo`],
/// but the two datastructures serve different needs, this is for a persistent disk format
/// that must be backwards compatible, while the other is only for informative purposes.
#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, Copy, PartialEq, Eq)]
pub struct OffloadedTimelineManifest {
pub timeline_id: TimelineId,
/// Whether the timeline has a parent it has been branched off from or not
@@ -47,166 +35,20 @@ pub struct OffloadedTimelineManifest {
pub archived_at: NaiveDateTime,
}
/// The newest manifest version. This should be incremented on changes, even non-breaking ones. We
/// do not use deny_unknown_fields, so new fields are not breaking.
///
/// 1: initial version
/// 2: +stripe_size
///
/// When adding new versions, also add a parse_vX test case below.
pub const LATEST_TENANT_MANIFEST_VERSION: usize = 2;
pub const LATEST_TENANT_MANIFEST_VERSION: usize = 1;
impl TenantManifest {
/// Returns true if the manifests are equal, ignoring the version number. This avoids
/// re-uploading all manifests just because the version number is bumped.
pub fn eq_ignoring_version(&self, other: &Self) -> bool {
// Fast path: if the version is equal, just compare directly.
if self.version == other.version {
return self == other;
pub(crate) fn empty() -> Self {
Self {
version: LATEST_TENANT_MANIFEST_VERSION,
offloaded_timelines: vec![],
}
// We could alternatively just clone and modify the version here.
let Self {
version: _, // ignore version
stripe_size,
offloaded_timelines,
} = self;
stripe_size == &other.stripe_size && offloaded_timelines == &other.offloaded_timelines
}
/// Decodes a manifest from JSON.
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes)
serde_json::from_slice::<Self>(bytes)
}
/// Encodes a manifest as JSON.
pub fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
pub(crate) fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use utils::id::TimelineId;
use super::*;
/// Empty manifests should be parsed. Version is required.
#[test]
fn parse_empty() -> anyhow::Result<()> {
let json = r#"{
"version": 0
}"#;
let expected = TenantManifest {
version: 0,
stripe_size: None,
offloaded_timelines: Vec::new(),
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// Unknown fields should be ignored, for forwards compatibility.
#[test]
fn parse_unknown_fields() -> anyhow::Result<()> {
let json = r#"{
"version": 1,
"foo": "bar"
}"#;
let expected = TenantManifest {
version: 1,
stripe_size: None,
offloaded_timelines: Vec::new(),
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// v1 manifests should be parsed, for backwards compatibility.
#[test]
fn parse_v1() -> anyhow::Result<()> {
let json = r#"{
"version": 1,
"offloaded_timelines": [
{
"timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"archived_at": "2025-03-07T11:07:11.373105434"
},
{
"timeline_id": "f3def5823ad7080d2ea538d8e12163fa",
"ancestor_timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"ancestor_retain_lsn": "0/1F79038",
"archived_at": "2025-03-05T11:10:22.257901390"
}
]
}"#;
let expected = TenantManifest {
version: 1,
stripe_size: None,
offloaded_timelines: vec![
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("5c4df612fd159e63c1b7853fe94d97da")?,
ancestor_timeline_id: None,
ancestor_retain_lsn: None,
archived_at: NaiveDateTime::from_str("2025-03-07T11:07:11.373105434")?,
},
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("f3def5823ad7080d2ea538d8e12163fa")?,
ancestor_timeline_id: Some(TimelineId::from_str(
"5c4df612fd159e63c1b7853fe94d97da",
)?),
ancestor_retain_lsn: Some(Lsn::from_str("0/1F79038")?),
archived_at: NaiveDateTime::from_str("2025-03-05T11:10:22.257901390")?,
},
],
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// v2 manifests should be parsed, for backwards compatibility.
#[test]
fn parse_v2() -> anyhow::Result<()> {
let json = r#"{
"version": 2,
"stripe_size": 32768,
"offloaded_timelines": [
{
"timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"archived_at": "2025-03-07T11:07:11.373105434"
},
{
"timeline_id": "f3def5823ad7080d2ea538d8e12163fa",
"ancestor_timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"ancestor_retain_lsn": "0/1F79038",
"archived_at": "2025-03-05T11:10:22.257901390"
}
]
}"#;
let expected = TenantManifest {
version: 2,
stripe_size: Some(ShardStripeSize(32768)),
offloaded_timelines: vec![
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("5c4df612fd159e63c1b7853fe94d97da")?,
ancestor_timeline_id: None,
ancestor_retain_lsn: None,
archived_at: NaiveDateTime::from_str("2025-03-07T11:07:11.373105434")?,
},
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("f3def5823ad7080d2ea538d8e12163fa")?,
ancestor_timeline_id: Some(TimelineId::from_str(
"5c4df612fd159e63c1b7853fe94d97da",
)?),
ancestor_retain_lsn: Some(Lsn::from_str("0/1F79038")?),
archived_at: NaiveDateTime::from_str("2025-03-05T11:10:22.257901390")?,
},
],
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
}

View File

@@ -61,7 +61,6 @@ pub(crate) async fn upload_index_part(
.await
.with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
}
/// Serializes and uploads the given tenant manifest data to the remote storage.
pub(crate) async fn upload_tenant_manifest(
storage: &GenericRemoteStorage,
@@ -77,14 +76,16 @@ pub(crate) async fn upload_tenant_manifest(
});
pausable_failpoint!("before-upload-manifest-pausable");
let serialized = Bytes::from(tenant_manifest.to_json_bytes()?);
let tenant_manifest_size = serialized.len();
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
let serialized = tenant_manifest.to_json_bytes()?;
let serialized = Bytes::from(serialized);
let tenant_manifest_site = serialized.len();
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
storage
.upload_storage_object(
futures::stream::once(futures::future::ready(Ok(serialized))),
tenant_manifest_size,
tenant_manifest_site,
&remote_path,
cancel,
)

View File

@@ -366,7 +366,7 @@ impl SplitDeltaLayerWriter {
)
.await?;
let (start_key, prev_delta_writer) =
self.inner.replace((key, next_delta_writer)).unwrap();
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
self.batches.add_unfinished_delta_writer(
prev_delta_writer,
start_key..key,

View File

@@ -766,7 +766,7 @@ mod tests {
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[len..]); // to discover bugs
Ok((dst, len))
}
Err(e) => Err(std::io::Error::other(e)),
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
}
}
}

View File

@@ -975,10 +975,6 @@ impl LayerInner {
allow_download: bool,
ctx: &RequestContext,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
let mut wait_for_download_recorder =
scopeguard::guard(utils::elapsed_accum::ElapsedAccum::default(), |accum| {
ctx.ondemand_download_wait_observe(accum.get());
});
let (weak, permit) = {
// get_or_init_detached can:
// - be fast (mutex lock) OR uncontested semaphore permit acquire
@@ -987,7 +983,7 @@ impl LayerInner {
let locked = self
.inner
.get_or_init_detached_measured(Some(&mut wait_for_download_recorder))
.get_or_init_detached()
.await
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
@@ -1017,7 +1013,6 @@ impl LayerInner {
Err(permit) => (None, permit),
}
};
let _guard = wait_for_download_recorder.guard();
if let Some(weak) = weak {
// only drop the weak after dropping the heavier_once_cell guard
@@ -1207,7 +1202,6 @@ impl LayerInner {
permit: heavier_once_cell::InitPermit,
ctx: &RequestContext,
) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
let start = std::time::Instant::now();
let result = timeline
.remote_client
.download_layer_file(
@@ -1219,8 +1213,7 @@ impl LayerInner {
ctx,
)
.await;
let latency = start.elapsed();
let latency_millis = u64::try_from(latency.as_millis()).unwrap();
match result {
Ok(size) => {
assert_eq!(size, self.desc.file_size);
@@ -1236,8 +1229,9 @@ impl LayerInner {
Err(e) => {
panic!("post-condition failed: needs_download errored: {e:?}");
}
};
tracing::info!(size=%self.desc.file_size, %latency_millis, "on-demand download successful");
}
tracing::info!(size=%self.desc.file_size, "on-demand download successful");
timeline
.metrics
.resident_physical_size_add(self.desc.file_size);
@@ -1266,7 +1260,7 @@ impl LayerInner {
return Err(e);
}
tracing::error!(consecutive_failures, %latency_millis, "layer file download failed: {e:#}");
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,

View File

@@ -59,7 +59,6 @@ impl LayerIterRef<'_> {
/// 1. Unified iterator for image and delta layers.
/// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
/// 3. Lazy creation of the real delta/image iterator.
#[allow(clippy::large_enum_variant, reason = "TODO")]
pub(crate) enum IteratorWrapper<'a> {
NotLoaded {
ctx: &'a RequestContext,

View File

@@ -268,12 +268,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
error_run += 1;
let backoff =
exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
log_compaction_error(
&err,
Some((error_run, backoff)),
cancel.is_cancelled(),
false,
);
log_compaction_error(&err, Some((error_run, backoff)), cancel.is_cancelled());
continue;
}
}
@@ -290,7 +285,6 @@ pub(crate) fn log_compaction_error(
err: &CompactionError,
retry_info: Option<(u32, Duration)>,
task_cancelled: bool,
degrade_to_warning: bool,
) {
use CompactionError::*;
@@ -339,7 +333,6 @@ pub(crate) fn log_compaction_error(
}
} else {
match level {
Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
Level::ERROR => error!("Compaction failed: {err:#}"),
Level::INFO => info!("Compaction failed: {err:#}"),
level => unimplemented!("unexpected level {level:?}"),

View File

@@ -24,7 +24,6 @@ use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::PERF_TRACE_TARGET;
use crate::walredo::RedoAttemptType;
use anyhow::{Context, Result, anyhow, bail, ensure};
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
@@ -116,7 +115,7 @@ use crate::pgdatadir_mapping::{
use crate::task_mgr::TaskKind;
use crate::tenant::config::AttachmentMode;
use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::layer_map::{LayerMap, SearchResult};
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
@@ -1040,7 +1039,6 @@ pub(crate) enum ShutdownMode {
Hard,
}
#[allow(clippy::large_enum_variant, reason = "TODO")]
enum ImageLayerCreationOutcome {
/// We generated an image layer
Generated {
@@ -1294,12 +1292,6 @@ impl Timeline {
};
reconstruct_state.read_path = read_path;
let redo_attempt_type = if ctx.task_kind() == TaskKind::Compaction {
RedoAttemptType::LegacyCompaction
} else {
RedoAttemptType::ReadPage
};
let traversal_res: Result<(), _> = {
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
@@ -1387,7 +1379,7 @@ impl Timeline {
let walredo_deltas = converted.num_deltas();
let walredo_res = walredo_self
.reconstruct_value(key, lsn, converted, redo_attempt_type)
.reconstruct_value(key, lsn, converted)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
@@ -1948,7 +1940,7 @@ impl Timeline {
)
.await;
if let Err(err) = &res {
log_compaction_error(err, None, cancel.is_cancelled(), false);
log_compaction_error(err, None, cancel.is_cancelled());
}
res
}
@@ -4112,6 +4104,12 @@ impl Timeline {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<TimelineVisitOutcome, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone();
let mut fringe = LayerFringe::new();
let mut completed_keyspace = KeySpace::default();
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
// Prevent GC from progressing while visiting the current timeline.
// If we are GC-ing because a new image layer was added while traversing
// the timeline, then it will remove layers that are required for fulfilling
@@ -4122,44 +4120,11 @@ impl Timeline {
// See `compaction::compact_with_gc` for why we need this.
let _guard = timeline.gc_compaction_layer_update_lock.read().await;
// Initialize the fringe
let mut fringe = {
let mut fringe = LayerFringe::new();
let guard = timeline.layers.read().await;
guard.update_search_fringe(&keyspace, cont_lsn, &mut fringe)?;
fringe
};
let mut completed_keyspace = KeySpace::default();
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
while let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
loop {
if cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
if let Some(ref mut read_path) = reconstruct_state.read_path {
read_path.record_layer_visit(&layer_to_read, &keyspace_to_read, &lsn_range);
}
// Visit the layer and plan IOs for it
let next_cont_lsn = lsn_range.start;
layer_to_read
.get_values_reconstruct_data(
keyspace_to_read.clone(),
lsn_range,
reconstruct_state,
ctx,
)
.await?;
let mut unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited(&layer_to_read);
let (keys_done_last_step, keys_with_image_coverage) =
reconstruct_state.consume_done_keys();
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
@@ -4170,15 +4135,31 @@ impl Timeline {
image_covered_keyspace.add_range(keys_with_image_coverage);
}
// Query the layer map for the next layers to read.
//
// Do not descent any further if the last layer we visited
// completed all keys in the keyspace it inspected. This is not
// required for correctness, but avoids visiting extra layers
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read().await;
guard.update_search_fringe(&unmapped_keyspace, cont_lsn, &mut fringe)?;
let layers = guard.layer_map()?;
for range in unmapped_keyspace.ranges.iter() {
let results = layers.range_search(range.clone(), cont_lsn);
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
guard.upgrade(layer),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| {
fringe.update(layer, keyspace, lsn_range)
});
}
// It's safe to drop the layer map lock after planning the next round of reads.
// The fringe keeps readable handles for the layers which are safe to read even
@@ -4192,6 +4173,28 @@ impl Timeline {
// at two different time points.
drop(guard);
}
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
if let Some(ref mut read_path) = reconstruct_state.read_path {
read_path.record_layer_visit(&layer_to_read, &keyspace_to_read, &lsn_range);
}
let next_cont_lsn = lsn_range.start;
layer_to_read
.get_values_reconstruct_data(
keyspace_to_read.clone(),
lsn_range,
reconstruct_state,
ctx,
)
.await?;
unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited(&layer_to_read);
} else {
break;
}
}
Ok(TimelineVisitOutcome {
@@ -6354,17 +6357,10 @@ impl Timeline {
key: Key,
request_lsn: Lsn,
mut data: ValueReconstructState,
redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, PageReconstructError> {
// Perform WAL redo if needed
data.records.reverse();
let fire_critical_error = match redo_attempt_type {
RedoAttemptType::ReadPage => true,
RedoAttemptType::LegacyCompaction => true,
RedoAttemptType::GcCompaction => false,
};
// If we have a page image, and no WAL, we're all set
if data.records.is_empty() {
if let Some((img_lsn, img)) = &data.img {
@@ -6411,22 +6407,13 @@ impl Timeline {
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(
key,
request_lsn,
data.img,
data.records,
self.pg_version,
redo_attempt_type,
)
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.await;
let img = match res {
Ok(img) => img,
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(err)) => {
if fire_critical_error {
critical!("walredo failure during page reconstruction: {err:?}");
}
critical!("walredo failure during page reconstruction: {err:?}");
return Err(PageReconstructError::WalRedo(
err.context("reconstruct a page image"),
));

View File

@@ -7,7 +7,7 @@
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use super::layer_manager::LayerManager;
use super::{
@@ -16,8 +16,6 @@ use super::{
Timeline,
};
use crate::tenant::timeline::DeltaEntry;
use crate::walredo::RedoAttemptType;
use anyhow::{Context, anyhow};
use bytes::Bytes;
use enumset::EnumSet;
@@ -317,9 +315,6 @@ impl GcCompactionQueue {
flags: {
let mut flags = EnumSet::new();
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
if timeline.get_compaction_l0_first() {
flags |= CompactFlags::YieldForL0;
}
flags
},
sub_compaction: true,
@@ -453,7 +448,7 @@ impl GcCompactionQueue {
) -> Result<CompactionOutcome, CompactionError> {
let res = self.iteration_inner(cancel, ctx, gc_block, timeline).await;
if let Err(err) = &res {
log_compaction_error(err, None, cancel.is_cancelled(), true);
log_compaction_error(err, None, cancel.is_cancelled());
}
match res {
Ok(res) => Ok(res),
@@ -824,16 +819,15 @@ pub struct CompactionStatistics {
time_acquire_lock_secs: f64,
time_analyze_secs: f64,
time_download_layer_secs: f64,
time_to_first_kv_pair_secs: f64,
time_main_loop_secs: f64,
time_final_phase_secs: f64,
time_total_secs: f64,
// Summary
/// Ratio of the key-value size after/before gc-compaction.
uncompressed_retention_ratio: f64,
/// Ratio of the physical size after/before gc-compaction.
compressed_retention_ratio: f64,
/// Ratio of the key-value size before/after gc-compaction.
uncompressed_size_ratio: f64,
/// Ratio of the physical size before/after gc-compaction.
physical_size_ratio: f64,
}
impl CompactionStatistics {
@@ -902,15 +896,15 @@ impl CompactionStatistics {
fn finalize(&mut self) {
let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
self.uncompressed_retention_ratio =
produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0
self.uncompressed_size_ratio =
original_key_value_size as f64 / (produced_key_value_size as f64 + 1.0); // avoid div by 0
let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
let produced_physical_size = self.image_layer_produced.size
+ self.delta_layer_produced.size
+ self.image_layer_discarded.size
+ self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
self.compressed_retention_ratio =
produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0
self.physical_size_ratio =
original_physical_size as f64 / (produced_physical_size as f64 + 1.0); // avoid div by 0
}
}
@@ -1250,10 +1244,6 @@ impl Timeline {
let mut replace_image_layers = Vec::new();
for layer in layers_to_rewrite {
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
@@ -2416,9 +2406,7 @@ impl Timeline {
} else {
lsn_split_points[i]
};
let img = self
.reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction)
.await?;
let img = self.reconstruct_value(key, request_lsn, state).await?;
Some((request_lsn, img))
} else {
None
@@ -3038,7 +3026,7 @@ impl Timeline {
.map_err(CompactionError::Other)?;
let time_download_layer = timer.elapsed();
let mut timer = Instant::now();
let timer = Instant::now();
// Step 2: Produce images+deltas.
let mut accumulated_values = Vec::new();
@@ -3113,7 +3101,8 @@ impl Timeline {
// Actually, we can decide not to write to the image layer at all at this point because
// the key and LSN range are determined. However, to keep things simple here, we still
// create this writer, and discard the writer in the end.
let mut time_to_first_kv_pair = None;
let mut keys_processed = 0;
while let Some(((key, lsn, val), desc)) = merge_iter
.next_with_trace()
@@ -3121,16 +3110,13 @@ impl Timeline {
.context("failed to get next key-value pair")
.map_err(CompactionError::Other)?
{
if time_to_first_kv_pair.is_none() {
time_to_first_kv_pair = Some(timer.elapsed());
timer = Instant::now();
}
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
keys_processed += 1;
let should_yield = yield_for_l0
&& keys_processed % 1000 == 0
&& self
.l0_compaction_trigger
.notified()
@@ -3461,9 +3447,6 @@ impl Timeline {
let time_final_phase = timer.elapsed();
stat.time_final_phase_secs = time_final_phase.as_secs_f64();
stat.time_to_first_kv_pair_secs = time_to_first_kv_pair
.unwrap_or(Duration::ZERO)
.as_secs_f64();
stat.time_main_loop_secs = time_main_loop.as_secs_f64();
stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
stat.time_download_layer_secs = time_download_layer.as_secs_f64();
@@ -3924,6 +3907,8 @@ impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
}
}
use crate::tenant::timeline::DeltaEntry;
impl CompactionLayer<Key> for ResidentDeltaLayer {
fn key_range(&self) -> &Range<Key> {
&self.0.layer_desc().key_range

View File

@@ -410,13 +410,10 @@ impl DeleteTimelineFlow {
// So indeed, the tenant manifest might refer to an offloaded timeline which has already been deleted.
// However, we handle this case in tenant loading code so the next time we attach, the issue is
// resolved.
tenant
.maybe_upload_tenant_manifest()
.await
.map_err(|err| match err {
TenantManifestError::Cancelled => DeleteTimelineError::Cancelled,
err => DeleteTimelineError::Other(err.into()),
})?;
tenant.store_tenant_manifest().await.map_err(|e| match e {
TenantManifestError::Cancelled => DeleteTimelineError::Cancelled,
_ => DeleteTimelineError::Other(e.into()),
})?;
*guard = Self::Finished;

View File

@@ -3,18 +3,17 @@ use std::sync::Arc;
use anyhow::{Context, bail, ensure};
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::TenantShardId;
use tokio_util::sync::CancellationToken;
use tracing::trace;
use utils::id::TimelineId;
use utils::lsn::{AtomicLsn, Lsn};
use super::{LayerFringe, ReadableLayer, TimelineWriterState};
use super::{ReadableLayer, TimelineWriterState};
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::metrics::TimelineMetrics;
use crate::tenant::layer_map::{BatchedUpdates, LayerMap, SearchResult};
use crate::tenant::layer_map::{BatchedUpdates, LayerMap};
use crate::tenant::storage_layer::{
AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
@@ -39,7 +38,7 @@ impl Default for LayerManager {
}
impl LayerManager {
fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
pub(crate) fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
match weak {
ReadableLayerWeak::PersistentLayer(desc) => {
ReadableLayer::PersistentLayer(self.get_from_desc(&desc))
@@ -148,36 +147,6 @@ impl LayerManager {
self.layers().keys().cloned().collect_vec()
}
/// Update the [`LayerFringe`] of a read request
///
/// Take a key space at a given LSN and query the layer map below each range
/// of the key space to find the next layers to visit.
pub(crate) fn update_search_fringe(
&self,
keyspace: &KeySpace,
cont_lsn: Lsn,
fringe: &mut LayerFringe,
) -> Result<(), Shutdown> {
let map = self.layer_map()?;
for range in keyspace.ranges.iter() {
let results = map.range_search(range.clone(), cont_lsn);
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
self.upgrade(layer),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| fringe.update(layer, keyspace, lsn_range));
}
Ok(())
}
fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
use LayerManager::*;
match self {

View File

@@ -111,7 +111,7 @@ pub(crate) async fn offload_timeline(
// at the next restart attach it again.
// For that to happen, we'd need to make the manifest reflect our *intended* state,
// not our actual state of offloaded timelines.
tenant.maybe_upload_tenant_manifest().await?;
tenant.store_tenant_manifest().await?;
tracing::info!("Timeline offload complete (remaining arc refcount: {remaining_refcount})");

View File

@@ -445,7 +445,7 @@ pub(super) async fn handle_walreceiver_connection(
.inspect_err(|err| {
// TODO: we can't differentiate cancellation errors with
// anyhow::Error, so just ignore it if we're cancelled.
if !cancellation.is_cancelled() && !timeline.is_stopping() {
if !cancellation.is_cancelled() {
critical!("{err:?}")
}
})?;
@@ -577,7 +577,7 @@ pub(super) async fn handle_walreceiver_connection(
.inspect_err(|err| {
// TODO: we can't differentiate cancellation errors with
// anyhow::Error, so just ignore it if we're cancelled.
if !cancellation.is_cancelled() && !timeline.is_stopping() {
if !cancellation.is_cancelled() {
critical!("{err:?}")
}
})?;

View File

@@ -302,7 +302,6 @@ pub struct UploadQueueStoppedDeletable {
pub(super) deleted_at: SetDeletedFlagProgress,
}
#[allow(clippy::large_enum_variant, reason = "TODO")]
pub enum UploadQueueStopped {
Deletable(UploadQueueStoppedDeletable),
Uninitialized,

View File

@@ -136,16 +136,6 @@ macro_rules! bail {
}
}
#[derive(Debug, Clone, Copy)]
pub enum RedoAttemptType {
/// Used for the read path. Will fire critical errors and retry twice if failure.
ReadPage,
// Used for legacy compaction (only used in image compaction). Will fire critical errors and retry once if failure.
LegacyCompaction,
// Used for gc compaction. Will not fire critical errors and not retry.
GcCompaction,
}
///
/// Public interface of WAL redo manager
///
@@ -166,18 +156,11 @@ impl PostgresRedoManager {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, Error> {
if records.is_empty() {
bail!("invalid WAL redo request with no records");
}
let max_retry_attempts = match redo_attempt_type {
RedoAttemptType::ReadPage => 2,
RedoAttemptType::LegacyCompaction => 1,
RedoAttemptType::GcCompaction => 0,
};
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
let mut img = base_img.map(|p| p.1);
let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
@@ -197,7 +180,6 @@ impl PostgresRedoManager {
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
max_retry_attempts,
)
.await
};
@@ -219,7 +201,6 @@ impl PostgresRedoManager {
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
max_retry_attempts,
)
.await
}
@@ -443,11 +424,11 @@ impl PostgresRedoManager {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
max_retry_attempts: u32,
) -> Result<Bytes, Error> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let mut n_attempts = 0u32;
loop {
let base_img = &base_img;
@@ -505,7 +486,7 @@ impl PostgresRedoManager {
info!(n_attempts, "retried walredo succeeded");
}
n_attempts += 1;
if n_attempts > max_retry_attempts || result.is_ok() {
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
return result;
}
}
@@ -579,7 +560,6 @@ mod tests {
use super::PostgresRedoManager;
use crate::config::PageServerConf;
use crate::walredo::RedoAttemptType;
#[tokio::test]
async fn test_ping() {
@@ -613,7 +593,6 @@ mod tests {
None,
short_records(),
14,
RedoAttemptType::ReadPage,
)
.instrument(h.span())
.await
@@ -642,7 +621,6 @@ mod tests {
None,
short_records(),
14,
RedoAttemptType::ReadPage,
)
.instrument(h.span())
.await
@@ -664,7 +642,6 @@ mod tests {
None,
short_records(),
16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
RedoAttemptType::ReadPage,
)
.instrument(h.span())
.await

View File

@@ -276,7 +276,6 @@ pub(crate) fn apply_in_neon(
append,
clear,
will_init,
only_if,
} => {
use bytes::BufMut;
if *will_init {
@@ -289,13 +288,6 @@ pub(crate) fn apply_in_neon(
if *clear {
page.clear();
}
if let Some(only_if) = only_if {
if page != only_if.as_bytes() {
return Err(anyhow::anyhow!(
"the current image does not match the expected image, cannot append"
));
}
}
page.put_slice(append.as_bytes());
}
}

View File

@@ -4,7 +4,6 @@
MODULE_big = neon
OBJS = \
$(WIN32RES) \
communicator.o \
extension_server.o \
file_cache.o \
hll.o \

View File

@@ -9,4 +9,4 @@
#define BITMAP_SET(bm, bit) (bm)[(bit) >> 3] |= (1 << ((bit) & 7))
#define BITMAP_CLR(bm, bit) (bm)[(bit) >> 3] &= ~(1 << ((bit) & 7))
#endif /* NEON_BITMAP_H */
#endif //NEON_BITMAP_H

File diff suppressed because it is too large Load Diff

View File

@@ -1,48 +0,0 @@
/*-------------------------------------------------------------------------
*
* communicator.h
* internal interface for communicating with remote pageservers
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef COMMUNICATOR_h
#define COMMUNICATOR_h
#include "neon_pgversioncompat.h"
#include "storage/buf_internals.h"
#include "pagestore_client.h"
/* initialization at postmaster startup */
extern void pg_init_communicator(void);
/* initialization at backend startup */
extern void communicator_init(void);
extern bool communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum,
neon_request_lsns *request_lsns);
extern BlockNumber communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum,
neon_request_lsns *request_lsns);
extern int64 communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns);
extern void communicator_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber base_blockno, neon_request_lsns *request_lsns,
void **buffers, BlockNumber nblocks, const bits8 *mask);
extern int communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum,
neon_request_lsns *lsns,
BlockNumber nblocks, void **buffers, bits8 *mask);
extern void communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask);
extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns,
void *buffer);
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts);
#endif

View File

@@ -13,6 +13,9 @@
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* IDENTIFICATION
* contrib/neon/control_plane_connector.c
*
*-------------------------------------------------------------------------
*/

View File

@@ -3,6 +3,9 @@
* extension_server.c
* Request compute_ctl to download extension files.
*
* IDENTIFICATION
* contrib/neon/extension_server.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"

View File

@@ -3,6 +3,9 @@
* extension_server.h
* Request compute_ctl to download extension files.
*
* IDENTIFICATION
* contrib/neon/extension_server.h
*
*-------------------------------------------------------------------------
*/

View File

@@ -1,4 +1,4 @@
/*-------------------------------------------------------------------------
/*
*
* file_cache.c
*
@@ -6,6 +6,10 @@
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* pgxn/neon/file_cache.c
*
*-------------------------------------------------------------------------
*/
@@ -21,6 +25,7 @@
#include "access/xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include "common/hashfn.h"
#include "pgstat.h"
#include "port/pg_iovec.h"
@@ -42,7 +47,6 @@
#include "hll.h"
#include "bitmap.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
@@ -1563,12 +1567,8 @@ local_cache_pages(PG_FUNCTION_ARGS)
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
/* Skip hole tags */
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
}
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
}
}
}
@@ -1596,19 +1596,16 @@ local_cache_pages(PG_FUNCTION_ARGS)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
if (GET_STATE(entry, i) == AVAILABLE)
{
if (GET_STATE(entry, i) == AVAILABLE)
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].forknum = entry->key.forkNum;
fctx->record[n].blocknum = entry->key.blockNum + i;
fctx->record[n].accesscount = entry->access_count;
n += 1;
}
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].forknum = entry->key.forkNum;
fctx->record[n].blocknum = entry->key.blockNum + i;
fctx->record[n].accesscount = entry->access_count;
n += 1;
}
}
}

View File

@@ -1,52 +0,0 @@
/*-------------------------------------------------------------------------
*
* file_cache.h
* Local File Cache definitions
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef FILE_CACHE_h
#define FILE_CACHE_h
#include "neon_pgversioncompat.h"
/* GUCs */
extern bool lfc_store_prefetch_result;
/* functions for local file cache */
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);
extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
#endif /* FILE_CACHE_H */

Some files were not shown because too many files have changed in this diff Show More