mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
Compare commits
1 Commits
vlad/safek
...
erik/wal-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8dd8369a83 |
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -4009,7 +4009,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4022,7 +4022,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4041,7 +4041,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -5161,7 +5161,6 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
@@ -5192,7 +5191,6 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"utils",
|
||||
"wal_decoder",
|
||||
"walproposer",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -5665,9 +5663,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.13.2"
|
||||
version = "1.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
||||
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
|
||||
|
||||
[[package]]
|
||||
name = "smol_str"
|
||||
@@ -6076,9 +6074,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemalloc-ctl"
|
||||
version = "0.6.0"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b"
|
||||
checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"paste",
|
||||
@@ -6087,9 +6085,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemalloc-sys"
|
||||
version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
|
||||
version = "0.5.4+5.3.0-patched"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d"
|
||||
checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
@@ -6097,9 +6095,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemallocator"
|
||||
version = "0.6.0"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865"
|
||||
checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"tikv-jemalloc-sys",
|
||||
@@ -6229,7 +6227,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -6783,7 +6781,6 @@ dependencies = [
|
||||
"serde_assert",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
|
||||
14
Cargo.toml
14
Cargo.toml
@@ -168,8 +168,8 @@ sync_wrapper = "0.1.2"
|
||||
tar = "0.4"
|
||||
test-context = "0.3"
|
||||
thiserror = "1.0"
|
||||
tikv-jemallocator = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemallocator = "0.5"
|
||||
tikv-jemalloc-ctl = "0.5"
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
@@ -203,10 +203,10 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
@@ -244,7 +244,7 @@ tonic-build = "0.12"
|
||||
[patch.crates-io]
|
||||
|
||||
# Needed to get `tokio-postgres-rustls` to depend on our fork.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
@@ -147,7 +147,7 @@ index 542c2e098c..0062d3024f 100644
|
||||
ALTER TABLE ptnowner1 OWNER TO regress_ptnowner;
|
||||
ALTER TABLE ptnowner OWNER TO regress_ptnowner;
|
||||
diff --git a/src/test/regress/expected/collate.icu.utf8.out b/src/test/regress/expected/collate.icu.utf8.out
|
||||
index 3f9a8f539c..0a51b52940 100644
|
||||
index 97bbe53b64..eac3d42a79 100644
|
||||
--- a/src/test/regress/expected/collate.icu.utf8.out
|
||||
+++ b/src/test/regress/expected/collate.icu.utf8.out
|
||||
@@ -1016,7 +1016,7 @@ select * from collate_test1 where b ilike 'ABC';
|
||||
@@ -309,7 +309,7 @@ index b48365ec98..a6ef910055 100644
|
||||
-- the wrong partition. This test is *not* guaranteed to trigger that bug, but
|
||||
-- does so when shared_buffers is small enough. To test if we encountered the
|
||||
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
|
||||
index 9a74820ee8..22400a5551 100644
|
||||
index faf1a4d1b0..a44c97db52 100644
|
||||
--- a/src/test/regress/expected/copy2.out
|
||||
+++ b/src/test/regress/expected/copy2.out
|
||||
@@ -553,8 +553,8 @@ select * from check_con_tbl;
|
||||
@@ -573,7 +573,7 @@ index 93302a07ef..1a73f083ac 100644
|
||||
-- that does not match with what's expected.
|
||||
-- This checks all the object types that include schema qualifications.
|
||||
diff --git a/src/test/regress/expected/create_view.out b/src/test/regress/expected/create_view.out
|
||||
index f551624afb..57f1e432d4 100644
|
||||
index f3f8c7b5a2..3e3e54ff4c 100644
|
||||
--- a/src/test/regress/expected/create_view.out
|
||||
+++ b/src/test/regress/expected/create_view.out
|
||||
@@ -18,7 +18,8 @@ CREATE TABLE real_city (
|
||||
@@ -700,12 +700,12 @@ index 6ed50fdcfa..caa00a345d 100644
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
|
||||
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
|
||||
index 6b8c2f2414..8e13b7fa46 100644
|
||||
index 12e523c737..8872e23935 100644
|
||||
--- a/src/test/regress/expected/foreign_key.out
|
||||
+++ b/src/test/regress/expected/foreign_key.out
|
||||
@@ -1985,7 +1985,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
ERROR: cannot ALTER TABLE "fk_partitioned_pk_61" because it is being used by active queries in this session
|
||||
DROP TABLE fk_partitioned_pk_6, fk_partitioned_fk_6;
|
||||
@@ -1968,7 +1968,7 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
|
||||
FOR VALUES IN (1600);
|
||||
-- leave these tables around intentionally
|
||||
-- test the case when the referenced table is owned by a different user
|
||||
-create role regress_other_partitioned_fk_owner;
|
||||
+create role regress_other_partitioned_fk_owner PASSWORD NEON_PASSWORD_PLACEHOLDER;
|
||||
@@ -713,7 +713,7 @@ index 6b8c2f2414..8e13b7fa46 100644
|
||||
set role regress_other_partitioned_fk_owner;
|
||||
create table other_partitioned_fk(a int, b int) partition by list (a);
|
||||
diff --git a/src/test/regress/expected/generated.out b/src/test/regress/expected/generated.out
|
||||
index 5881420388..4ae21aa43c 100644
|
||||
index 0f623f7119..b48588a54e 100644
|
||||
--- a/src/test/regress/expected/generated.out
|
||||
+++ b/src/test/regress/expected/generated.out
|
||||
@@ -534,7 +534,7 @@ CREATE TABLE gtest10a (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STOR
|
||||
@@ -762,7 +762,7 @@ index a2036a1597..805d73b9d2 100644
|
||||
-- fields, leading to long bucket chains and lots of table expansion.
|
||||
-- this is therefore a stress test of the bucket overflow code (unlike
|
||||
diff --git a/src/test/regress/expected/identity.out b/src/test/regress/expected/identity.out
|
||||
index 1b74958de9..078187b542 100644
|
||||
index cc7772349f..98a08eb48d 100644
|
||||
--- a/src/test/regress/expected/identity.out
|
||||
+++ b/src/test/regress/expected/identity.out
|
||||
@@ -520,7 +520,7 @@ ALTER TABLE itest7 ALTER COLUMN a SET GENERATED BY DEFAULT;
|
||||
@@ -775,10 +775,10 @@ index 1b74958de9..078187b542 100644
|
||||
GRANT SELECT, INSERT ON itest8 TO regress_identity_user1;
|
||||
SET ROLE regress_identity_user1;
|
||||
diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out
|
||||
index 8f831c95c3..ec681b52af 100644
|
||||
index 4943429e9b..0257f22b15 100644
|
||||
--- a/src/test/regress/expected/inherit.out
|
||||
+++ b/src/test/regress/expected/inherit.out
|
||||
@@ -2636,7 +2636,7 @@ create index on permtest_parent (left(c, 3));
|
||||
@@ -2606,7 +2606,7 @@ create index on permtest_parent (left(c, 3));
|
||||
insert into permtest_parent
|
||||
select 1, 'a', left(fipshash(i::text), 5) from generate_series(0, 100) i;
|
||||
analyze permtest_parent;
|
||||
@@ -1133,7 +1133,7 @@ index 8475231735..1afae5395f 100644
|
||||
SELECT rolname, rolpassword
|
||||
FROM pg_authid
|
||||
diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out
|
||||
index 5b9dba7b32..cc408dad42 100644
|
||||
index fbb0489a4f..2905194e2c 100644
|
||||
--- a/src/test/regress/expected/privileges.out
|
||||
+++ b/src/test/regress/expected/privileges.out
|
||||
@@ -20,19 +20,19 @@ SELECT lo_unlink(oid) FROM pg_largeobject_metadata WHERE oid >= 1000 AND oid < 3
|
||||
@@ -1185,7 +1185,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
GRANT pg_read_all_data TO regress_priv_user6;
|
||||
GRANT pg_write_all_data TO regress_priv_user7;
|
||||
GRANT pg_read_all_settings TO regress_priv_user8 WITH ADMIN OPTION;
|
||||
@@ -212,8 +212,8 @@ REVOKE pg_read_all_settings FROM regress_priv_user8;
|
||||
@@ -145,8 +145,8 @@ REVOKE pg_read_all_settings FROM regress_priv_user8;
|
||||
DROP USER regress_priv_user10;
|
||||
DROP USER regress_priv_user9;
|
||||
DROP USER regress_priv_user8;
|
||||
@@ -1196,7 +1196,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
GRANT regress_priv_group2 TO regress_priv_user2 GRANTED BY regress_priv_user1;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
@@ -239,12 +239,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
@@ -172,12 +172,16 @@ GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY regre
|
||||
ERROR: permission denied to grant privileges as role "regress_priv_role"
|
||||
DETAIL: The grantor must have the ADMIN option on role "regress_priv_role".
|
||||
GRANT regress_priv_role TO regress_priv_user1 WITH ADMIN OPTION GRANTED BY CURRENT_ROLE;
|
||||
@@ -1213,7 +1213,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
DROP ROLE regress_priv_role;
|
||||
SET SESSION AUTHORIZATION regress_priv_user1;
|
||||
SELECT session_user, current_user;
|
||||
@@ -1776,7 +1780,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1709,7 +1713,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -1222,7 +1222,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
-- A dummy index function checking current_user
|
||||
CREATE FUNCTION sro_ifun(int) RETURNS int AS $$
|
||||
@@ -2668,8 +2672,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
@@ -2601,8 +2605,8 @@ drop cascades to function testns.priv_testagg(integer)
|
||||
drop cascades to function testns.priv_testproc(integer)
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
@@ -1233,7 +1233,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
SELECT nspname, rolname FROM pg_namespace, pg_roles WHERE pg_namespace.nspname = 'testns' AND pg_namespace.nspowner = pg_roles.oid;
|
||||
@@ -2792,7 +2796,7 @@ DROP USER regress_priv_user7;
|
||||
@@ -2725,7 +2729,7 @@ DROP USER regress_priv_user7;
|
||||
DROP USER regress_priv_user8; -- does not exist
|
||||
ERROR: role "regress_priv_user8" does not exist
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -1242,7 +1242,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
-- LOCK TABLE and SELECT permission
|
||||
GRANT SELECT ON lock_table TO regress_locktable_user;
|
||||
@@ -2874,7 +2878,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -2807,7 +2811,7 @@ DROP USER regress_locktable_user;
|
||||
-- pg_backend_memory_contexts.
|
||||
-- switch to superuser
|
||||
\c -
|
||||
@@ -1251,7 +1251,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
has_table_privilege
|
||||
---------------------
|
||||
@@ -2918,10 +2922,10 @@ RESET ROLE;
|
||||
@@ -2851,10 +2855,10 @@ RESET ROLE;
|
||||
-- clean up
|
||||
DROP ROLE regress_readallstats;
|
||||
-- test role grantor machinery
|
||||
@@ -1266,7 +1266,7 @@ index 5b9dba7b32..cc408dad42 100644
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
SET SESSION AUTHORIZATION regress_group_direct_manager;
|
||||
@@ -2950,9 +2954,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
@@ -2883,9 +2887,9 @@ DROP ROLE regress_group_direct_manager;
|
||||
DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
@@ -1813,7 +1813,7 @@ index 5e6969b173..2c4d52237f 100644
|
||||
|
||||
-- clean up roles
|
||||
diff --git a/src/test/regress/expected/rowsecurity.out b/src/test/regress/expected/rowsecurity.out
|
||||
index 218c0c2863..f7af0cfb12 100644
|
||||
index 97ca9bf72c..b2a7a6f710 100644
|
||||
--- a/src/test/regress/expected/rowsecurity.out
|
||||
+++ b/src/test/regress/expected/rowsecurity.out
|
||||
@@ -14,13 +14,13 @@ DROP ROLE IF EXISTS regress_rls_group2;
|
||||
@@ -1917,19 +1917,6 @@ index b79fe9a1c0..e29fab88ab 100644
|
||||
ALTER DEFAULT PRIVILEGES FOR ROLE regress_selinto_user
|
||||
REVOKE INSERT ON TABLES FROM regress_selinto_user;
|
||||
GRANT ALL ON SCHEMA selinto_schema TO public;
|
||||
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
|
||||
index afc6ab08c2..dfcd891af3 100644
|
||||
--- a/src/test/regress/expected/select_parallel.out
|
||||
+++ b/src/test/regress/expected/select_parallel.out
|
||||
@@ -1220,7 +1220,7 @@ SELECT 1 FROM tenk1_vw_sec
|
||||
|
||||
rollback;
|
||||
-- test that function option SET ROLE works in parallel workers.
|
||||
-create role regress_parallel_worker;
|
||||
+create role regress_parallel_worker PASSWORD NEON_PASSWORD_PLACEHOLDER;
|
||||
create function set_and_report_role() returns text as
|
||||
$$ select current_setting('role') $$ language sql parallel safe
|
||||
set role = regress_parallel_worker;
|
||||
diff --git a/src/test/regress/expected/select_views.out b/src/test/regress/expected/select_views.out
|
||||
index 1aeed8452b..7d9427d070 100644
|
||||
--- a/src/test/regress/expected/select_views.out
|
||||
@@ -2382,7 +2369,7 @@ index 6cb9c926c0..5e689e4062 100644
|
||||
ALTER TABLE ptnowner1 OWNER TO regress_ptnowner;
|
||||
ALTER TABLE ptnowner OWNER TO regress_ptnowner;
|
||||
diff --git a/src/test/regress/sql/collate.icu.utf8.sql b/src/test/regress/sql/collate.icu.utf8.sql
|
||||
index 8aa902d5ab..24bb823b86 100644
|
||||
index 3db9e25913..c66d5aa2c2 100644
|
||||
--- a/src/test/regress/sql/collate.icu.utf8.sql
|
||||
+++ b/src/test/regress/sql/collate.icu.utf8.sql
|
||||
@@ -353,7 +353,7 @@ reset enable_seqscan;
|
||||
@@ -2545,7 +2532,7 @@ index 43d2e906dd..6c993d70f0 100644
|
||||
-- An earlier bug (see commit b1ecb9b3fcf) could end up using a buffer from
|
||||
-- the wrong partition. This test is *not* guaranteed to trigger that bug, but
|
||||
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
|
||||
index cf3828c16e..cf3ca38175 100644
|
||||
index d759635068..d58e50dcc5 100644
|
||||
--- a/src/test/regress/sql/copy2.sql
|
||||
+++ b/src/test/regress/sql/copy2.sql
|
||||
@@ -365,8 +365,8 @@ copy check_con_tbl from stdin;
|
||||
@@ -2787,7 +2774,7 @@ index 1b7064247a..be5b662ce1 100644
|
||||
-- Cases where schema creation fails as objects are qualified with a schema
|
||||
-- that does not match with what's expected.
|
||||
diff --git a/src/test/regress/sql/create_view.sql b/src/test/regress/sql/create_view.sql
|
||||
index ae6841308b..47bc792e30 100644
|
||||
index 3a78be1b0c..617d2dc8d6 100644
|
||||
--- a/src/test/regress/sql/create_view.sql
|
||||
+++ b/src/test/regress/sql/create_view.sql
|
||||
@@ -23,7 +23,8 @@ CREATE TABLE real_city (
|
||||
@@ -2914,11 +2901,11 @@ index aa147b14a9..370e0dd570 100644
|
||||
CREATE FOREIGN DATA WRAPPER dummy;
|
||||
COMMENT ON FOREIGN DATA WRAPPER dummy IS 'useless';
|
||||
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
|
||||
index 45c7a534cb..32dd26b8cd 100644
|
||||
index 22e177f89b..7138d5e1d4 100644
|
||||
--- a/src/test/regress/sql/foreign_key.sql
|
||||
+++ b/src/test/regress/sql/foreign_key.sql
|
||||
@@ -1435,7 +1435,7 @@ ALTER TABLE fk_partitioned_fk_6 ATTACH PARTITION fk_partitioned_pk_6 FOR VALUES
|
||||
DROP TABLE fk_partitioned_pk_6, fk_partitioned_fk_6;
|
||||
@@ -1418,7 +1418,7 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
|
||||
-- leave these tables around intentionally
|
||||
|
||||
-- test the case when the referenced table is owned by a different user
|
||||
-create role regress_other_partitioned_fk_owner;
|
||||
@@ -2976,7 +2963,7 @@ index 527024f710..de49c0b85f 100644
|
||||
-- the data in this file has a lot of duplicates in the index key
|
||||
-- fields, leading to long bucket chains and lots of table expansion.
|
||||
diff --git a/src/test/regress/sql/identity.sql b/src/test/regress/sql/identity.sql
|
||||
index 7537258a75..9041e35e34 100644
|
||||
index 91d2e443b4..241c93f373 100644
|
||||
--- a/src/test/regress/sql/identity.sql
|
||||
+++ b/src/test/regress/sql/identity.sql
|
||||
@@ -287,7 +287,7 @@ ALTER TABLE itest7 ALTER COLUMN a RESTART;
|
||||
@@ -2989,10 +2976,10 @@ index 7537258a75..9041e35e34 100644
|
||||
GRANT SELECT, INSERT ON itest8 TO regress_identity_user1;
|
||||
SET ROLE regress_identity_user1;
|
||||
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
|
||||
index b5b554a125..109889ad24 100644
|
||||
index fe699c54d5..bdd5993f45 100644
|
||||
--- a/src/test/regress/sql/inherit.sql
|
||||
+++ b/src/test/regress/sql/inherit.sql
|
||||
@@ -958,7 +958,7 @@ create index on permtest_parent (left(c, 3));
|
||||
@@ -950,7 +950,7 @@ create index on permtest_parent (left(c, 3));
|
||||
insert into permtest_parent
|
||||
select 1, 'a', left(fipshash(i::text), 5) from generate_series(0, 100) i;
|
||||
analyze permtest_parent;
|
||||
@@ -3231,7 +3218,7 @@ index 53e86b0b6c..f07cf1ec54 100644
|
||||
CREATE ROLE regress_passwd5 PASSWORD 'md5e73a4b11df52a6068f8b39f90be36023';
|
||||
|
||||
diff --git a/src/test/regress/sql/privileges.sql b/src/test/regress/sql/privileges.sql
|
||||
index 249df17a58..b258e7f26a 100644
|
||||
index 3f68cafcd1..004b26831d 100644
|
||||
--- a/src/test/regress/sql/privileges.sql
|
||||
+++ b/src/test/regress/sql/privileges.sql
|
||||
@@ -24,18 +24,18 @@ RESET client_min_messages;
|
||||
@@ -3282,7 +3269,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
GRANT pg_read_all_data TO regress_priv_user6;
|
||||
GRANT pg_write_all_data TO regress_priv_user7;
|
||||
@@ -163,8 +163,8 @@ DROP USER regress_priv_user10;
|
||||
@@ -130,8 +130,8 @@ DROP USER regress_priv_user10;
|
||||
DROP USER regress_priv_user9;
|
||||
DROP USER regress_priv_user8;
|
||||
|
||||
@@ -3293,7 +3280,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
ALTER GROUP regress_priv_group1 ADD USER regress_priv_user4;
|
||||
|
||||
@@ -1157,7 +1157,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
@@ -1124,7 +1124,7 @@ SELECT has_table_privilege('regress_priv_user1', 'atest4', 'SELECT WITH GRANT OP
|
||||
|
||||
-- security-restricted operations
|
||||
\c -
|
||||
@@ -3302,7 +3289,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
-- Check that index expressions and predicates are run as the table's owner
|
||||
|
||||
@@ -1653,8 +1653,8 @@ DROP SCHEMA testns CASCADE;
|
||||
@@ -1620,8 +1620,8 @@ DROP SCHEMA testns CASCADE;
|
||||
-- Change owner of the schema & and rename of new schema owner
|
||||
\c -
|
||||
|
||||
@@ -3313,7 +3300,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
SET SESSION ROLE regress_schemauser1;
|
||||
CREATE SCHEMA testns;
|
||||
@@ -1748,7 +1748,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
@@ -1715,7 +1715,7 @@ DROP USER regress_priv_user8; -- does not exist
|
||||
|
||||
|
||||
-- permissions with LOCK TABLE
|
||||
@@ -3322,7 +3309,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
CREATE TABLE lock_table (a int);
|
||||
|
||||
-- LOCK TABLE and SELECT permission
|
||||
@@ -1836,7 +1836,7 @@ DROP USER regress_locktable_user;
|
||||
@@ -1803,7 +1803,7 @@ DROP USER regress_locktable_user;
|
||||
-- switch to superuser
|
||||
\c -
|
||||
|
||||
@@ -3331,7 +3318,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
SELECT has_table_privilege('regress_readallstats','pg_backend_memory_contexts','SELECT'); -- no
|
||||
SELECT has_table_privilege('regress_readallstats','pg_shmem_allocations','SELECT'); -- no
|
||||
@@ -1856,10 +1856,10 @@ RESET ROLE;
|
||||
@@ -1823,10 +1823,10 @@ RESET ROLE;
|
||||
DROP ROLE regress_readallstats;
|
||||
|
||||
-- test role grantor machinery
|
||||
@@ -3346,7 +3333,7 @@ index 249df17a58..b258e7f26a 100644
|
||||
|
||||
GRANT regress_group TO regress_group_direct_manager WITH INHERIT FALSE, ADMIN TRUE;
|
||||
GRANT regress_group_direct_manager TO regress_group_indirect_manager;
|
||||
@@ -1881,9 +1881,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
@@ -1848,9 +1848,9 @@ DROP ROLE regress_group_indirect_manager;
|
||||
DROP ROLE regress_group_member;
|
||||
|
||||
-- test SET and INHERIT options with object ownership changes
|
||||
@@ -3638,7 +3625,7 @@ index c961b2d730..0859b89c4f 100644
|
||||
-- clean up roles
|
||||
DROP ROLE regress_test_def_superuser;
|
||||
diff --git a/src/test/regress/sql/rowsecurity.sql b/src/test/regress/sql/rowsecurity.sql
|
||||
index d3bfd53e23..919ce1d0c6 100644
|
||||
index dec7340538..cdbc03a5cc 100644
|
||||
--- a/src/test/regress/sql/rowsecurity.sql
|
||||
+++ b/src/test/regress/sql/rowsecurity.sql
|
||||
@@ -20,13 +20,13 @@ DROP SCHEMA IF EXISTS regress_rls_schema CASCADE;
|
||||
@@ -3714,19 +3701,6 @@ index 689c448cc2..223ceb1d75 100644
|
||||
ALTER DEFAULT PRIVILEGES FOR ROLE regress_selinto_user
|
||||
REVOKE INSERT ON TABLES FROM regress_selinto_user;
|
||||
GRANT ALL ON SCHEMA selinto_schema TO public;
|
||||
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
|
||||
index 33d78e16dc..cb193c9b27 100644
|
||||
--- a/src/test/regress/sql/select_parallel.sql
|
||||
+++ b/src/test/regress/sql/select_parallel.sql
|
||||
@@ -464,7 +464,7 @@ SELECT 1 FROM tenk1_vw_sec
|
||||
rollback;
|
||||
|
||||
-- test that function option SET ROLE works in parallel workers.
|
||||
-create role regress_parallel_worker;
|
||||
+create role regress_parallel_worker PASSWORD NEON_PASSWORD_PLACEHOLDER;
|
||||
|
||||
create function set_and_report_role() returns text as
|
||||
$$ select current_setting('role') $$ language sql parallel safe
|
||||
diff --git a/src/test/regress/sql/select_views.sql b/src/test/regress/sql/select_views.sql
|
||||
index e742f13699..7bd0255df8 100644
|
||||
--- a/src/test/regress/sql/select_views.sql
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::{
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol};
|
||||
use utils::logging::LogFormat;
|
||||
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
use crate::models::LsnLease;
|
||||
@@ -109,7 +109,6 @@ pub struct ConfigToml {
|
||||
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub no_sync: Option<bool>,
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -318,9 +317,6 @@ pub mod defaults {
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
|
||||
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
|
||||
utils::postgres_client::PostgresClientProtocol::Interpreted;
|
||||
}
|
||||
|
||||
impl Default for ConfigToml {
|
||||
@@ -403,7 +399,6 @@ impl Default for ConfigToml {
|
||||
virtual_file_io_mode: None,
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
no_sync: None,
|
||||
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -562,9 +562,6 @@ pub enum BeMessage<'a> {
|
||||
options: &'a [&'a str],
|
||||
},
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
/// Batch of interpreted, shard filtered WAL records,
|
||||
/// ready for the pageserver to ingest
|
||||
InterpretedWalRecords(InterpretedWalRecordsBody<'a>),
|
||||
}
|
||||
|
||||
/// Common shorthands.
|
||||
@@ -675,18 +672,6 @@ pub struct WalSndKeepAlive {
|
||||
pub request_reply: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InterpretedWalRecordsBody<'a> {
|
||||
/// End of raw WAL in [`Self::data`]
|
||||
pub streaming_lsn: u64,
|
||||
/// Current end of WAL on the server
|
||||
pub commit_lsn: u64,
|
||||
/// Start LSN of the next record in PG WAL.
|
||||
/// Is 0 if the portion of PG WAL did not contain any records.
|
||||
pub next_record_lsn: u64,
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
|
||||
|
||||
// single text column
|
||||
@@ -1011,20 +996,6 @@ impl BeMessage<'_> {
|
||||
Ok(())
|
||||
})?
|
||||
}
|
||||
|
||||
BeMessage::InterpretedWalRecords(rec) => {
|
||||
// We use the COPY_DATA_TAG for our custom message
|
||||
// since this tag is interpreted as raw bytes.
|
||||
buf.put_u8(b'd');
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
|
||||
// dependency
|
||||
buf.put_u64(rec.streaming_lsn);
|
||||
buf.put_u64(rec.commit_lsn);
|
||||
buf.put_u64(rec.next_record_lsn);
|
||||
buf.put_slice(rec.data);
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -32,7 +32,6 @@ pin-project-lite.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -50,8 +50,8 @@ REDO_POS=0x$("$PG_BIN"/pg_controldata -D "$DATA_DIR" | grep -F "REDO location"|
|
||||
declare -i WAL_SIZE=$REDO_POS+114
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" start
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
|
||||
cp "$DATA_DIR"/pg_wal/000000010000000000000001 "$DATA_DIR"
|
||||
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
|
||||
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
|
||||
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
|
||||
dd if="$DATA_DIR"/000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
|
||||
rm -f "$DATA_DIR"/000000010000000000000001
|
||||
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
|
||||
rm -f 000000010000000000000001
|
||||
|
||||
@@ -14,8 +14,8 @@ REDO_POS=0x$("$PG_BIN"/pg_controldata -D "$DATA_DIR" | grep -F "REDO location"|
|
||||
declare -i WAL_SIZE=$REDO_POS+114
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" start
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
|
||||
cp "$DATA_DIR"/pg_wal/000000010000000000000001 "$DATA_DIR"
|
||||
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
|
||||
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
|
||||
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
|
||||
dd if="$DATA_DIR"/000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
|
||||
rm -f "$DATA_DIR"/000000010000000000000001
|
||||
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
|
||||
rm -f 000000010000000000000001
|
||||
|
||||
@@ -7,94 +7,29 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
|
||||
use crate::id::TenantTimelineId;
|
||||
|
||||
/// Postgres client protocol types
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
#[repr(u8)]
|
||||
pub enum PostgresClientProtocol {
|
||||
/// Usual Postgres replication protocol
|
||||
Vanilla,
|
||||
/// Custom shard-aware protocol that replicates interpreted records.
|
||||
/// Used to send wal from safekeeper to pageserver.
|
||||
Interpreted,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PostgresClientProtocol {
|
||||
type Error = u8;
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
|
||||
v if v == (PostgresClientProtocol::Interpreted as u8) => {
|
||||
PostgresClientProtocol::Interpreted
|
||||
}
|
||||
x => return Err(x),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectionConfigArgs<'a> {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard_number: Option<u8>,
|
||||
pub shard_count: Option<u8>,
|
||||
pub shard_stripe_size: Option<u32>,
|
||||
|
||||
pub listen_pg_addr_str: &'a str,
|
||||
|
||||
pub auth_token: Option<&'a str>,
|
||||
pub availability_zone: Option<&'a str>,
|
||||
}
|
||||
|
||||
impl<'a> ConnectionConfigArgs<'a> {
|
||||
fn options(&'a self) -> Vec<String> {
|
||||
let mut options = vec![
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", self.ttid.timeline_id),
|
||||
format!("tenant_id={}", self.ttid.tenant_id),
|
||||
format!("protocol={}", self.protocol as u8),
|
||||
];
|
||||
|
||||
if self.shard_number.is_some() {
|
||||
assert!(self.shard_count.is_some());
|
||||
assert!(self.shard_stripe_size.is_some());
|
||||
|
||||
options.push(format!("shard_count={}", self.shard_count.unwrap()));
|
||||
options.push(format!("shard_number={}", self.shard_number.unwrap()));
|
||||
options.push(format!(
|
||||
"shard_stripe_size={}",
|
||||
self.shard_stripe_size.unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
}
|
||||
|
||||
/// Create client config for fetching WAL from safekeeper on particular timeline.
|
||||
/// listen_pg_addr_str is in form host:\[port\].
|
||||
pub fn wal_stream_connection_config(
|
||||
args: ConnectionConfigArgs,
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options(args.options())
|
||||
.set_password(args.auth_token.map(|s| s.to_owned()));
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = args.availability_zone {
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
|
||||
@@ -65,18 +65,6 @@ pub struct InterpretedWalRecord {
|
||||
pub xid: TransactionId,
|
||||
}
|
||||
|
||||
impl InterpretedWalRecord {
|
||||
/// Checks if the WAL record is empty
|
||||
///
|
||||
/// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
|
||||
/// pageserver.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.batch.is_empty()
|
||||
&& self.metadata_record.is_none()
|
||||
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
|
||||
}
|
||||
}
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
/// writes to the underlying storage engine.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -496,16 +496,11 @@ impl SerializedValueBatch {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the batch contains any serialized or observed values
|
||||
pub fn is_empty(&self) -> bool {
|
||||
!self.has_data() && self.metadata.is_empty()
|
||||
}
|
||||
|
||||
/// Checks if the batch contains data
|
||||
/// Checks if the batch is empty
|
||||
///
|
||||
/// Note that if this returns false, it may still contain observed values or
|
||||
/// a metadata record.
|
||||
pub fn has_data(&self) -> bool {
|
||||
/// A batch is empty when it contains no serialized values.
|
||||
/// Note that it may still contain observed values.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
let empty = self.raw.is_empty();
|
||||
|
||||
if cfg!(debug_assertions) && empty {
|
||||
@@ -515,7 +510,7 @@ impl SerializedValueBatch {
|
||||
.all(|meta| matches!(meta, ValueMeta::Observed(_))));
|
||||
}
|
||||
|
||||
!empty
|
||||
empty
|
||||
}
|
||||
|
||||
/// Returns the number of values serialized in the batch
|
||||
|
||||
@@ -167,7 +167,6 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
16384,
|
||||
virtual_file::io_engine_for_bench(),
|
||||
conf.virtual_file_io_mode,
|
||||
virtual_file::SyncMode::Sync,
|
||||
);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
|
||||
@@ -138,7 +138,6 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
virtual_file::SyncMode::Sync,
|
||||
);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
|
||||
@@ -51,7 +51,6 @@ async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
virtual_file::SyncMode::Sync,
|
||||
);
|
||||
page_cache::init(100);
|
||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||
@@ -66,7 +65,6 @@ async fn read_image_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
virtual_file::SyncMode::Sync,
|
||||
);
|
||||
page_cache::init(100);
|
||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||
@@ -173,7 +171,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
virtual_file::SyncMode::Sync,
|
||||
);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
|
||||
@@ -209,7 +209,6 @@ async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
IoMode::preferred(),
|
||||
virtual_file::SyncMode::Sync,
|
||||
);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
@@ -126,7 +126,6 @@ fn main() -> anyhow::Result<()> {
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
|
||||
// The tenants directory contains all the pageserver local disk state.
|
||||
// Create if not exists and make sure all the contents are durable before proceeding.
|
||||
@@ -172,18 +171,11 @@ fn main() -> anyhow::Result<()> {
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
tracing::info!("Initializing virtual_file...");
|
||||
virtual_file::init(
|
||||
conf.max_file_descriptors,
|
||||
conf.virtual_file_io_engine,
|
||||
conf.virtual_file_io_mode,
|
||||
if conf.no_sync {
|
||||
virtual_file::SyncMode::UnsafeNoSync
|
||||
} else {
|
||||
virtual_file::SyncMode::Sync
|
||||
},
|
||||
);
|
||||
tracing::info!("Initializing page_cache...");
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
@@ -14,7 +14,6 @@ use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::logging::SecretString;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use reqwest::Url;
|
||||
@@ -183,8 +182,6 @@ pub struct PageServerConf {
|
||||
|
||||
/// Optionally disable disk syncs (unsafe!)
|
||||
pub no_sync: bool,
|
||||
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -341,7 +338,6 @@ impl PageServerConf {
|
||||
virtual_file_io_engine,
|
||||
tenant_config,
|
||||
no_sync,
|
||||
wal_receiver_protocol,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -381,7 +377,6 @@ impl PageServerConf {
|
||||
image_compression,
|
||||
timeline_offloading,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
wal_receiver_protocol,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -1055,9 +1055,10 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
pub(crate) fn has_dirty_data(&self) -> bool {
|
||||
self.pending_data_batch
|
||||
!self
|
||||
.pending_data_batch
|
||||
.as_ref()
|
||||
.map_or(false, |b| b.has_data())
|
||||
.map_or(true, |b| b.is_empty())
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
@@ -1233,7 +1234,7 @@ impl<'a> DatadirModification<'a> {
|
||||
Some(pending_batch) => {
|
||||
pending_batch.extend(batch);
|
||||
}
|
||||
None if batch.has_data() => {
|
||||
None if !batch.is_empty() => {
|
||||
self.pending_data_batch = Some(batch);
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -2416,7 +2416,6 @@ impl Timeline {
|
||||
*guard = Some(WalReceiver::start(
|
||||
Arc::clone(self),
|
||||
WalReceiverConf {
|
||||
protocol: self.conf.wal_receiver_protocol,
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
@@ -5789,7 +5788,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
if !batch.has_data() {
|
||||
if batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,6 @@ use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use self::connection_manager::ConnectionManagerStatus;
|
||||
|
||||
@@ -46,7 +45,6 @@ use super::Timeline;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WalReceiverConf {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
/// The timeout on the connection to safekeeper for WAL streaming.
|
||||
pub wal_connect_timeout: Duration,
|
||||
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
|
||||
|
||||
@@ -36,9 +36,7 @@ use postgres_connection::PgConnectionConfig;
|
||||
use utils::backoff::{
|
||||
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::postgres_client::{
|
||||
wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol,
|
||||
};
|
||||
use utils::postgres_client::wal_stream_connection_config;
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -986,33 +984,15 @@ impl ConnectionManagerState {
|
||||
if info.safekeeper_connstr.is_empty() {
|
||||
return None; // no connection string, ignore sk
|
||||
}
|
||||
|
||||
let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
(None, None, None)
|
||||
match wal_stream_connection_config(
|
||||
self.id,
|
||||
info.safekeeper_connstr.as_ref(),
|
||||
match &self.conf.auth_token {
|
||||
None => None,
|
||||
Some(x) => Some(x),
|
||||
},
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
let shard_identity = self.timeline.get_shard_identity();
|
||||
(
|
||||
Some(shard_identity.number.0),
|
||||
Some(shard_identity.count.0),
|
||||
Some(shard_identity.stripe_size.0),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: self.conf.protocol,
|
||||
ttid: self.id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
shard_stripe_size,
|
||||
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
availability_zone: self.conf.availability_zone.as_deref()
|
||||
};
|
||||
|
||||
match wal_stream_connection_config(connection_conf_args) {
|
||||
self.conf.availability_zone.as_deref(),
|
||||
) {
|
||||
Ok(connstr) => Some((*sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
|
||||
@@ -1116,7 +1096,6 @@ impl ReconnectReason {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
|
||||
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
|
||||
use url::Host;
|
||||
|
||||
fn dummy_broker_sk_timeline(
|
||||
@@ -1553,7 +1532,6 @@ mod tests {
|
||||
timeline,
|
||||
cancel: CancellationToken::new(),
|
||||
conf: WalReceiverConf {
|
||||
protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
wal_connect_timeout: Duration::from_secs(1),
|
||||
lagging_wal_timeout: Duration::from_secs(1),
|
||||
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
/// Status of the connection.
|
||||
@@ -291,15 +291,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
connection_status.latest_connection_update = now;
|
||||
connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
|
||||
}
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
connection_status.latest_connection_update = now;
|
||||
if !raw.data().is_empty() {
|
||||
connection_status.latest_wal_update = now;
|
||||
}
|
||||
|
||||
connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn()));
|
||||
connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
|
||||
}
|
||||
&_ => {}
|
||||
};
|
||||
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
|
||||
@@ -307,130 +298,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let status_update = match replication_message {
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
|
||||
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
// This is the end LSN of the raw WAL from which the records
|
||||
// were interpreted.
|
||||
let streaming_lsn = Lsn::from(raw.streaming_lsn());
|
||||
tracing::debug!(
|
||||
"Received WAL up to {streaming_lsn} with next_record_lsn={}",
|
||||
Lsn(raw.next_record_lsn().unwrap_or(0))
|
||||
);
|
||||
|
||||
let records = Vec::<InterpretedWalRecord>::des(raw.data()).with_context(|| {
|
||||
anyhow::anyhow!(
|
||||
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
|
||||
)
|
||||
})?;
|
||||
|
||||
// We start the modification at 0 because each interpreted record
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
for interpreted in records {
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let next_record_lsn = interpreted.next_record_lsn;
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {next_record_lsn}"))?;
|
||||
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
uncommitted_records += 1;
|
||||
|
||||
// FIXME: this cannot be made pausable_failpoint without fixing the
|
||||
// failpoint library; in tests, the added amount of debugging will cause us
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
if uncommitted_records >= ingest_batch_size
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Records might have been filtered out on the safekeeper side, but we still
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
// Commit any uncommitted records
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !caught_up && streaming_lsn >= end_of_wal {
|
||||
info!("caught up at LSN {streaming_lsn}");
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
|
||||
timeline.get_last_record_lsn()
|
||||
);
|
||||
|
||||
Some(streaming_lsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
@@ -448,6 +316,21 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
|
||||
@@ -175,16 +175,10 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
|
||||
return Ok(());
|
||||
}
|
||||
self.inner.sync_all().await
|
||||
}
|
||||
|
||||
pub async fn sync_data(&self) -> Result<(), Error> {
|
||||
if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
|
||||
return Ok(());
|
||||
}
|
||||
self.inner.sync_data().await
|
||||
}
|
||||
|
||||
@@ -239,27 +233,6 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicates whether to enable fsync, fdatasync, or O_SYNC/O_DSYNC when writing
|
||||
/// files. Switching this off is unsafe and only used for testing on machines
|
||||
/// with slow drives.
|
||||
#[repr(u8)]
|
||||
pub enum SyncMode {
|
||||
Sync,
|
||||
UnsafeNoSync,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for SyncMode {
|
||||
type Error = u8;
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (SyncMode::Sync as u8) => SyncMode::Sync,
|
||||
v if v == (SyncMode::UnsafeNoSync as u8) => SyncMode::UnsafeNoSync,
|
||||
x => return Err(x),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
/// the underlying file is closed if the system is low on file descriptors,
|
||||
@@ -1359,13 +1332,12 @@ impl OpenFiles {
|
||||
/// server startup.
|
||||
///
|
||||
#[cfg(not(test))]
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode, sync_mode: SyncMode) {
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
set_io_mode(mode);
|
||||
io_engine::init(engine);
|
||||
SYNC_MODE.store(sync_mode as u8, std::sync::atomic::Ordering::Relaxed);
|
||||
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
|
||||
}
|
||||
|
||||
@@ -1407,9 +1379,6 @@ pub(crate) fn set_io_mode(mode: IoMode) {
|
||||
pub(crate) fn get_io_mode() -> IoMode {
|
||||
IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
|
||||
}
|
||||
|
||||
static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::context::DownloadBehavior;
|
||||
|
||||
@@ -28,7 +28,6 @@ hyper0.workspace = true
|
||||
futures.workspace = true
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
@@ -58,7 +57,6 @@ sd-notify.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
utils.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -2,15 +2,11 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
|
||||
use std::future::Future;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
@@ -39,8 +35,6 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard: Option<ShardIdentity>,
|
||||
pub protocol: Option<PostgresClientProtocol>,
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
||||
@@ -113,28 +107,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
) -> Result<(), QueryError> {
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(options) = params.options_raw() {
|
||||
let mut shard_count: Option<u8> = None;
|
||||
let mut shard_number: Option<u8> = None;
|
||||
let mut shard_stripe_size: Option<u32> = None;
|
||||
|
||||
for opt in options {
|
||||
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
|
||||
// remove these after the PR gets deployed:
|
||||
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
|
||||
match opt.split_once('=') {
|
||||
Some(("protocol", value)) => {
|
||||
let raw_value = value
|
||||
.parse::<u8>()
|
||||
.with_context(|| format!("Failed to parse {value} as protocol"))?;
|
||||
|
||||
self.protocol = Some(
|
||||
PostgresClientProtocol::try_from(raw_value).map_err(|_| {
|
||||
QueryError::Other(anyhow::anyhow!(
|
||||
"Unexpected client protocol type: {raw_value}"
|
||||
))
|
||||
})?,
|
||||
);
|
||||
}
|
||||
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
|
||||
self.tenant_id = Some(value.parse().with_context(|| {
|
||||
format!("Failed to parse {value} as tenant id")
|
||||
@@ -150,54 +127,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
metrics.set_client_az(client_az)
|
||||
}
|
||||
}
|
||||
Some(("shard_count", value)) => {
|
||||
shard_count = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard count")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_number", value)) => {
|
||||
shard_number = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard number")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_stripe_size", value)) => {
|
||||
shard_stripe_size = Some(value.parse::<u32>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard stripe size")
|
||||
})?);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
match self.protocol() {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
if shard_count.is_some()
|
||||
|| shard_number.is_some()
|
||||
|| shard_stripe_size.is_some()
|
||||
{
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params specified for vanilla protocol"
|
||||
)));
|
||||
}
|
||||
}
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
match (shard_count, shard_number, shard_stripe_size) {
|
||||
(Some(count), Some(number), Some(stripe_size)) => {
|
||||
let params = ShardParameters {
|
||||
count: ShardCount(count),
|
||||
stripe_size: ShardStripeSize(stripe_size),
|
||||
};
|
||||
self.shard =
|
||||
Some(ShardIdentity::from_params(ShardNumber(number), ¶ms));
|
||||
}
|
||||
_ => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params were not specified"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
@@ -218,11 +150,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
tracing::field::debug(self.appname.clone()),
|
||||
);
|
||||
|
||||
if let Some(shard) = self.shard.as_ref() {
|
||||
tracing::Span::current()
|
||||
.record("shard", tracing::field::display(shard.shard_slug()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(QueryError::Other(anyhow::anyhow!(
|
||||
@@ -331,8 +258,6 @@ impl SafekeeperPostgresHandler {
|
||||
tenant_id: None,
|
||||
timeline_id: None,
|
||||
ttid: TenantTimelineId::empty(),
|
||||
shard: None,
|
||||
protocol: None,
|
||||
conn_id,
|
||||
claims: None,
|
||||
auth,
|
||||
@@ -340,10 +265,6 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn protocol(&self) -> PostgresClientProtocol {
|
||||
self.protocol.unwrap_or(PostgresClientProtocol::Vanilla)
|
||||
}
|
||||
|
||||
// when accessing management api supply None as an argument
|
||||
// when using to authorize tenant pass corresponding tenant id
|
||||
fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
|
||||
|
||||
@@ -29,7 +29,6 @@ pub mod receive_wal;
|
||||
pub mod recovery;
|
||||
pub mod remove_wal;
|
||||
pub mod safekeeper;
|
||||
pub mod send_interpreted_wal;
|
||||
pub mod send_wal;
|
||||
pub mod state;
|
||||
pub mod timeline;
|
||||
@@ -39,7 +38,6 @@ pub mod timeline_manager;
|
||||
pub mod timelines_set;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ use tokio::{
|
||||
use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::{ConnectionConfigArgs, PostgresClientProtocol};
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
|
||||
|
||||
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
|
||||
@@ -326,17 +325,7 @@ async fn recovery_stream(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: pass auth token
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: PostgresClientProtocol::Vanilla,
|
||||
ttid: tli.ttid,
|
||||
shard_number: None,
|
||||
shard_count: None,
|
||||
shard_stripe_size: None,
|
||||
listen_pg_addr_str: &donor.pg_connstr,
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
};
|
||||
let cfg = wal_stream_connection_config(connection_conf_args)?;
|
||||
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
|
||||
let mut cfg = cfg.to_tokio_postgres_config();
|
||||
// It will make safekeeper give out not committed WAL (up to flush_lsn).
|
||||
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
|
||||
|
||||
@@ -603,7 +603,10 @@ where
|
||||
|
||||
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
|
||||
pub fn flush_lsn(&self) -> Lsn {
|
||||
max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
|
||||
max(
|
||||
self.wal_store.flush_record_lsn(),
|
||||
self.state.timeline_start_lsn,
|
||||
)
|
||||
}
|
||||
|
||||
/// Process message from proposer and possibly form reply. Concurrent
|
||||
@@ -828,7 +831,7 @@ where
|
||||
//
|
||||
// If we fail before first WAL write flush this action would be
|
||||
// repeated, that's ok because it is idempotent.
|
||||
if self.wal_store.flush_lsn() == Lsn::INVALID {
|
||||
if self.wal_store.flush_record_lsn() == Lsn::INVALID {
|
||||
self.wal_store
|
||||
.initialize_first_segment(msg.start_streaming_at)
|
||||
.await?;
|
||||
@@ -947,7 +950,7 @@ where
|
||||
// while first connection still gets some packets later. It might be
|
||||
// better to not log this as error! above.
|
||||
let write_lsn = self.wal_store.write_lsn();
|
||||
let flush_lsn = self.wal_store.flush_lsn();
|
||||
let flush_lsn = self.wal_store.flush_record_lsn();
|
||||
if write_lsn > msg.h.begin_lsn {
|
||||
bail!(
|
||||
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
|
||||
@@ -1087,7 +1090,7 @@ mod tests {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
fn flush_record_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
|
||||
@@ -1,121 +0,0 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
|
||||
use postgres_ffi::MAX_SEND_SIZE;
|
||||
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
|
||||
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::InterpretedWalRecord;
|
||||
|
||||
use crate::send_wal::EndWatchView;
|
||||
use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
|
||||
|
||||
/// Shard-aware interpreted record sender.
|
||||
/// This is used for sending WAL to the pageserver. Said WAL
|
||||
/// is pre-interpreted and filtered for the shard.
|
||||
pub(crate) struct InterpretedWalSender<'a, IO> {
|
||||
pub(crate) pgb: &'a mut PostgresBackend<IO>,
|
||||
pub(crate) wal_stream_builder: WalReaderStreamBuilder,
|
||||
pub(crate) end_watch_view: EndWatchView,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) pg_version: u32,
|
||||
pub(crate) appname: Option<String>,
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
/// Send interpreted WAL to a receiver.
|
||||
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut wal_position = self.wal_stream_builder.start_pos();
|
||||
let mut wal_decoder =
|
||||
WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
|
||||
|
||||
let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
|
||||
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
keepalive_ticker.reset();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Get some WAL from the stream and then: decode, interpret and send it
|
||||
wal = stream.next() => {
|
||||
let WalBytes { wal, wal_start_lsn, wal_end_lsn, commit_lsn } = match wal {
|
||||
Some(some) => some?,
|
||||
None => { break; }
|
||||
};
|
||||
|
||||
wal_position = wal_start_lsn;
|
||||
wal_decoder.feed_bytes(&wal);
|
||||
|
||||
let mut records = Vec::new();
|
||||
let mut max_next_record_lsn = None;
|
||||
while let Some((next_record_lsn, recdata)) = wal_decoder
|
||||
.poll_decode()
|
||||
.with_context(|| "Failed to decode WAL")?
|
||||
{
|
||||
assert!(next_record_lsn.is_aligned());
|
||||
max_next_record_lsn = Some(next_record_lsn);
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
&self.shard,
|
||||
next_record_lsn,
|
||||
self.pg_version,
|
||||
)
|
||||
.with_context(|| "Failed to interpret WAL")?;
|
||||
|
||||
if !interpreted.is_empty() {
|
||||
records.push(interpreted);
|
||||
}
|
||||
}
|
||||
|
||||
let mut buf = Vec::new();
|
||||
records
|
||||
.ser_into(&mut buf)
|
||||
.with_context(|| "Failed to serialize interpreted WAL")?;
|
||||
|
||||
// Reset the keep alive ticker since we are sending something
|
||||
// over the wire now.
|
||||
keepalive_ticker.reset();
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
|
||||
streaming_lsn: wal_end_lsn.0,
|
||||
commit_lsn: commit_lsn.0,
|
||||
next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
|
||||
data: buf.as_slice(),
|
||||
})).await?;
|
||||
}
|
||||
|
||||
// Send a periodic keep alive when the connection has been idle for a while.
|
||||
_ = keepalive_ticker.tick() => {
|
||||
self.pgb
|
||||
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
wal_end: self.end_watch_view.get().0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The loop above ends when the receiver is caught up and there's no more WAL to send.
|
||||
Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, wal_position,
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -5,15 +5,12 @@ use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::RECEIVED_PS_FEEDBACKS;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{Term, TermLsn};
|
||||
use crate::send_interpreted_wal::InterpretedWalSender;
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_reader_stream::WalReaderStreamBuilder;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{bail, Context as AnyhowContext};
|
||||
use bytes::Bytes;
|
||||
use futures::future::Either;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::PostgresBackend;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
|
||||
@@ -25,7 +22,6 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::net::SocketAddr;
|
||||
@@ -230,7 +226,7 @@ impl WalSenders {
|
||||
|
||||
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
|
||||
/// client is not pageserver.
|
||||
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
let shared = self.mutex.lock();
|
||||
let slot = shared.get_slot(id);
|
||||
match slot.feedback {
|
||||
@@ -374,16 +370,6 @@ pub struct WalSenderGuard {
|
||||
walsenders: Arc<WalSenders>,
|
||||
}
|
||||
|
||||
impl WalSenderGuard {
|
||||
pub fn id(&self) -> WalSenderId {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub fn walsenders(&self) -> &Arc<WalSenders> {
|
||||
&self.walsenders
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalSenderGuard {
|
||||
fn drop(&mut self) {
|
||||
self.walsenders.unregister(self.id);
|
||||
@@ -454,12 +440,11 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}",
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_)),
|
||||
appname,
|
||||
self.protocol(),
|
||||
appname
|
||||
);
|
||||
|
||||
// switch to copy
|
||||
@@ -471,49 +456,19 @@ impl SafekeeperPostgresHandler {
|
||||
// not synchronized with sends, so this avoids deadlocks.
|
||||
let reader = pgb.split().context("START_REPLICATION split")?;
|
||||
|
||||
let send_fut = match self.protocol() {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
let sender = WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: vec![0u8; MAX_SEND_SIZE],
|
||||
};
|
||||
|
||||
Either::Left(sender.run())
|
||||
}
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
|
||||
let end_watch_view = end_watch.view();
|
||||
let wal_stream_builder = WalReaderStreamBuilder {
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
wal_sender_guard: ws_guard.clone(),
|
||||
};
|
||||
|
||||
let sender = InterpretedWalSender {
|
||||
pgb,
|
||||
wal_stream_builder,
|
||||
end_watch_view,
|
||||
shard: self.shard.unwrap(),
|
||||
pg_version,
|
||||
appname,
|
||||
};
|
||||
|
||||
Either::Right(sender.run())
|
||||
}
|
||||
let mut sender = WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: vec![0u8; MAX_SEND_SIZE],
|
||||
};
|
||||
|
||||
let mut reply_reader = ReplyReader {
|
||||
reader,
|
||||
ws_guard: ws_guard.clone(),
|
||||
@@ -522,7 +477,7 @@ impl SafekeeperPostgresHandler {
|
||||
|
||||
let res = tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = send_fut => r,
|
||||
r = sender.run() => r,
|
||||
r = reply_reader.run() => r,
|
||||
};
|
||||
|
||||
@@ -544,22 +499,16 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO(vlad): maybe lift this instead
|
||||
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
|
||||
/// given term (recovery by walproposer or peer safekeeper).
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum EndWatch {
|
||||
enum EndWatch {
|
||||
Commit(Receiver<Lsn>),
|
||||
Flush(Receiver<TermLsn>),
|
||||
}
|
||||
|
||||
impl EndWatch {
|
||||
pub(crate) fn view(&self) -> EndWatchView {
|
||||
EndWatchView(self.clone())
|
||||
}
|
||||
|
||||
/// Get current end of WAL.
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
fn get(&self) -> Lsn {
|
||||
match self {
|
||||
EndWatch::Commit(r) => *r.borrow(),
|
||||
EndWatch::Flush(r) => r.borrow().lsn,
|
||||
@@ -567,44 +516,15 @@ impl EndWatch {
|
||||
}
|
||||
|
||||
/// Wait for the update.
|
||||
pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
EndWatch::Commit(r) => r.changed().await?,
|
||||
EndWatch::Flush(r) => r.changed().await?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_lsn(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
client_term: Option<Term>,
|
||||
) -> anyhow::Result<Lsn> {
|
||||
loop {
|
||||
let end_pos = self.get();
|
||||
if end_pos > lsn {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = &self {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = client_term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.changed().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct EndWatchView(EndWatch);
|
||||
|
||||
impl EndWatchView {
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
self.0.get()
|
||||
}
|
||||
}
|
||||
/// A half driving sending WAL.
|
||||
struct WalSender<'a, IO> {
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
@@ -640,7 +560,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
|
||||
@@ -176,7 +176,7 @@ pub enum StateSK {
|
||||
impl StateSK {
|
||||
pub fn flush_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
|
||||
StateSK::Loaded(sk) => sk.wal_store.flush_record_lsn(),
|
||||
StateSK::Offloaded(state) => match state.eviction_state {
|
||||
EvictionState::Offloaded(flush_lsn) => flush_lsn,
|
||||
_ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
|
||||
@@ -1108,11 +1108,11 @@ impl ManagerTimeline {
|
||||
);
|
||||
}
|
||||
|
||||
if wal_store.flush_lsn() != shared.sk.flush_lsn() {
|
||||
if wal_store.flush_record_lsn() != shared.sk.flush_lsn() {
|
||||
bail!(
|
||||
"flush_lsn mismatch in restored WAL, expected {}, got {}",
|
||||
shared.sk.flush_lsn(),
|
||||
wal_store.flush_lsn()
|
||||
wal_store.flush_record_lsn()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -599,7 +599,7 @@ pub async fn validate_temp_timeline(
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
|
||||
|
||||
let commit_lsn = control_store.commit_lsn;
|
||||
let flush_lsn = wal_store.flush_lsn();
|
||||
let flush_lsn = wal_store.flush_record_lsn();
|
||||
|
||||
Ok((commit_lsn, flush_lsn))
|
||||
}
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use postgres_backend::CopyStreamHandlerEnd;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
safekeeper::Term,
|
||||
send_wal::{EndWatch, WalSenderGuard},
|
||||
timeline::WalResidentTimeline,
|
||||
};
|
||||
|
||||
pub(crate) struct WalReaderStreamBuilder {
|
||||
pub(crate) tli: WalResidentTimeline,
|
||||
pub(crate) start_pos: Lsn,
|
||||
pub(crate) end_pos: Lsn,
|
||||
pub(crate) term: Option<Term>,
|
||||
pub(crate) end_watch: EndWatch,
|
||||
pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
pub(crate) fn start_pos(&self) -> Lsn {
|
||||
self.start_pos
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct WalBytes {
|
||||
/// Raw PG WAL
|
||||
pub(crate) wal: Bytes,
|
||||
/// Start LSN of [`Self::wal`]
|
||||
pub(crate) wal_start_lsn: Lsn,
|
||||
/// End LSN of [`Self::wal`]
|
||||
pub(crate) wal_end_lsn: Lsn,
|
||||
/// End LSN of WAL available on the safekeeper
|
||||
pub(crate) commit_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
/// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
|
||||
/// The stream terminates when the receiver (pageserver) is fully caught up
|
||||
/// and there's no active computes.
|
||||
pub(crate) async fn build(
|
||||
self,
|
||||
buffer_size: usize,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
|
||||
// TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
|
||||
// We can make the raw WAL sender use this stream too and remove the duplication.
|
||||
let Self {
|
||||
tli,
|
||||
mut start_pos,
|
||||
mut end_pos,
|
||||
term,
|
||||
mut end_watch,
|
||||
wal_sender_guard,
|
||||
} = self;
|
||||
let mut wal_reader = tli.get_walreader(start_pos).await?;
|
||||
let mut buffer = vec![0; buffer_size];
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
Ok(try_stream! {
|
||||
loop {
|
||||
let have_something_to_send = end_pos > start_pos;
|
||||
|
||||
if !have_something_to_send {
|
||||
// wait for lsn
|
||||
let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
|
||||
match res {
|
||||
Ok(ok) => {
|
||||
end_pos = ok?;
|
||||
},
|
||||
Err(_) => {
|
||||
if let EndWatch::Commit(_) = end_watch {
|
||||
if let Some(remote_consistent_lsn) = wal_sender_guard
|
||||
.walsenders()
|
||||
.get_ws_remote_consistent_lsn(wal_sender_guard.id())
|
||||
{
|
||||
if tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
assert!(
|
||||
end_pos > start_pos,
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by the buffer size
|
||||
let mut chunk_end_pos = start_pos + buffer_size as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= end_pos {
|
||||
chunk_end_pos = end_pos;
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
|
||||
let buffer = &mut buffer[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = term {
|
||||
Some(tli.acquire_term(t).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = wal_reader.read(buffer).await?
|
||||
};
|
||||
let wal = Bytes::copy_from_slice(&buffer[..send_size]);
|
||||
|
||||
yield WalBytes {
|
||||
wal,
|
||||
wal_start_lsn: start_pos,
|
||||
wal_end_lsn: start_pos + send_size as u64,
|
||||
commit_lsn: end_pos
|
||||
};
|
||||
|
||||
start_pos += send_size as u64;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -35,10 +35,10 @@ use pq_proto::SystemId;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
pub trait Storage {
|
||||
// Last written LSN.
|
||||
/// Last written LSN.
|
||||
fn write_lsn(&self) -> Lsn;
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
/// End LSN of last durably stored WAL record.
|
||||
fn flush_record_lsn(&self) -> Lsn;
|
||||
|
||||
/// Initialize segment by creating proper long header at the beginning of
|
||||
/// the segment and short header at the page of given LSN. This is only used
|
||||
@@ -116,11 +116,13 @@ pub struct PhysicalStorage {
|
||||
/// The last LSN flushed to disk. May be in the middle of a record.
|
||||
///
|
||||
/// NB: when the rest of the system refers to `flush_lsn`, it usually
|
||||
/// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
|
||||
/// and should be resolved.
|
||||
/// means `flush_record_lsn`. This `flush_lsn` is only used internally.
|
||||
flush_lsn: Lsn,
|
||||
|
||||
/// The LSN of the last WAL record flushed to disk.
|
||||
///
|
||||
/// NB: when the rest of the system refers to `flush_lsn`, it usually
|
||||
/// means `flush_record_lsn`.
|
||||
flush_record_lsn: Lsn,
|
||||
|
||||
/// Decoder is required for detecting boundaries of WAL records.
|
||||
@@ -387,11 +389,8 @@ impl Storage for PhysicalStorage {
|
||||
fn write_lsn(&self) -> Lsn {
|
||||
self.write_lsn
|
||||
}
|
||||
/// flush_lsn returns LSN of last durably stored WAL record.
|
||||
///
|
||||
/// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
|
||||
#[allow(clippy::misnamed_getters)]
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
/// End LSN of the last durably stored WAL record.
|
||||
fn flush_record_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
|
||||
@@ -178,7 +178,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
let mut conns: HashMap<usize, ConnState> = HashMap::new();
|
||||
|
||||
for (&_ttid, shared_state) in global.timelines.iter_mut() {
|
||||
let flush_lsn = shared_state.sk.wal_store.flush_lsn();
|
||||
let flush_lsn = shared_state.sk.wal_store.flush_record_lsn();
|
||||
let commit_lsn = shared_state.sk.state.commit_lsn;
|
||||
os.log_event(format!("tli_loaded;{};{}", flush_lsn.0, commit_lsn.0));
|
||||
}
|
||||
|
||||
@@ -180,7 +180,7 @@ impl wal_storage::Storage for DiskWALStorage {
|
||||
self.write_lsn
|
||||
}
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
fn flush_record_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
|
||||
@@ -15,21 +15,16 @@ from fixtures.neon_fixtures import (
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_sharded_ingest(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
shard_count: int,
|
||||
wal_receiver_protocol: str,
|
||||
):
|
||||
"""
|
||||
Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper
|
||||
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
|
||||
(shard_count=1) to the sharded case indicates the overhead of sharding.
|
||||
"""
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
|
||||
ROW_COUNT = 100_000_000 # about 7 GB of WAL
|
||||
|
||||
@@ -55,6 +50,7 @@ def test_sharded_ingest(
|
||||
# Start the endpoint.
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
# Ingest data and measure WAL volume and duration.
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -72,48 +68,4 @@ def test_sharded_ingest(
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
|
||||
total_ingested = 0
|
||||
total_records_received = 0
|
||||
ingested_by_ps = []
|
||||
for pageserver in env.pageservers:
|
||||
ingested = pageserver.http_client().get_metric_value(
|
||||
"pageserver_wal_ingest_bytes_received_total"
|
||||
)
|
||||
records_received = pageserver.http_client().get_metric_value(
|
||||
"pageserver_wal_ingest_records_received_total"
|
||||
)
|
||||
|
||||
if ingested is None:
|
||||
ingested = 0
|
||||
|
||||
if records_received is None:
|
||||
records_received = 0
|
||||
|
||||
ingested_by_ps.append(
|
||||
(
|
||||
pageserver.id,
|
||||
{
|
||||
"ingested": ingested,
|
||||
"records_received": records_received,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
total_ingested += int(ingested)
|
||||
total_records_received += int(records_received)
|
||||
|
||||
total_ingested_mb = total_ingested / (1024 * 1024)
|
||||
zenbenchmark.record("wal_ingested", total_ingested_mb, "MB", MetricReport.LOWER_IS_BETTER)
|
||||
zenbenchmark.record(
|
||||
"records_received", total_records_received, "records", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
ingested_by_ps.sort(key=lambda x: x[0])
|
||||
for _, stats in ingested_by_ps:
|
||||
for k in stats:
|
||||
if k != "records_received":
|
||||
stats[k] /= 1024**2
|
||||
|
||||
log.info(f"WAL ingested by each pageserver {ingested_by_ps}")
|
||||
|
||||
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
|
||||
|
||||
@@ -58,7 +58,7 @@ num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support", default-features = false, features = ["with-serde_json-1"] }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", default-features = false, features = ["with-serde_json-1"] }
|
||||
prost = { version = "0.13", features = ["prost-derive"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
@@ -75,10 +75,10 @@ smallvec = { version = "1", default-features = false, features = ["const_new", "
|
||||
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
||||
subtle = { version = "2" }
|
||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||
tikv-jemalloc-sys = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemalloc-sys = { version = "0.5" }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support", features = ["with-serde_json-1"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] }
|
||||
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
|
||||
Reference in New Issue
Block a user