Compare commits

..

12 Commits

Author SHA1 Message Date
Alexey Masterov
1ec150b7d2 change runner 2025-05-21 08:38:18 +02:00
Alexey Masterov
20de8c2d52 move to 495 2025-05-21 08:33:43 +02:00
Alexey Masterov
27ba297487 395 2025-05-20 13:36:57 +02:00
Alexey Masterov
01b5fc902f change patch 2025-05-20 11:56:44 +02:00
Alexey Masterov
73aa5ede11 Change fix patching 2025-05-20 09:42:49 +02:00
Alexey Masterov
ad0b4c6d01 Change region 2025-05-20 09:31:41 +02:00
Alexey Masterov
975a5c22c8 Change patch to 95++ 2025-05-20 09:25:30 +02:00
Alexey Masterov
f7abc25c3e Change patch to 95 2025-05-19 17:03:47 +02:00
Alexey Masterov
2b2df45e76 change to devin-generated 2025-05-19 14:50:03 +02:00
Alexey Masterov
a3d5ed9d2f x100 2025-05-19 12:34:22 +02:00
Alexey Masterov
f02f19a1d0 fix path 2025-05-19 12:03:20 +02:00
Alexey Masterov
a9f7a96cb7 Add patches 2025-05-19 11:49:45 +02:00
23 changed files with 14757 additions and 668 deletions

View File

@@ -14,11 +14,6 @@ defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow
group: ${{ github.workflow }}
cancel-in-progress: true
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -33,9 +28,10 @@ jobs:
strategy:
fail-fast: false
matrix:
pg-version: [16, 17]
pg-version: [17]
runs-on: us-east-2
#runs-on: us-east-2
runs-on: small
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
@@ -59,6 +55,7 @@ jobs:
run: |
cd "vendor/postgres-v${PG_VERSION}"
patch -p1 < "../../compute/patches/cloud_regress_pg${PG_VERSION}.patch"
patch -p1 < "../../compute/patches/cloud_regress_pg17_495.patch"
- name: Generate a random password
id: pwgen

10
Cargo.lock generated
View File

@@ -4434,16 +4434,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"prost 0.13.3",
"tonic",
"tonic-build",
"workspace_hack",
]
[[package]]
name = "papaya"
version = "0.2.1"

View File

@@ -9,7 +9,6 @@ members = [
"pageserver/ctl",
"pageserver/client",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
"safekeeper",
"safekeeper/client",
@@ -253,7 +252,6 @@ pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,129 @@
diff --git a/src/test/regress/sql/box.sql b/src/test/regress/sql/box.sql
index 249636c76c3..540c2b54dda 100644
--- a/src/test/regress/sql/box.sql
+++ b/src/test/regress/sql/box.sql
@@ -196,7 +196,7 @@ CREATE TABLE quad_box_tbl (id int, b box);
INSERT INTO quad_box_tbl
SELECT (x - 1) * 100 + y, box(point(x * 10, y * 10), point(x * 10 + 5, y * 10 + 5))
- FROM generate_series(1, 95 * 100) x,
+ FROM generate_series(1, 100) x,
generate_series(1, 95 * 100) y;
-- insert repeating data to test allTheSame
diff --git a/src/test/regress/sql/partition_join.sql b/src/test/regress/sql/partition_join.sql
index 3ca8a2d6090..a8e40f906c4 100644
--- a/src/test/regress/sql/partition_join.sql
+++ b/src/test/regress/sql/partition_join.sql
@@ -533,7 +533,7 @@ create temp table prtx2_3 partition of prtx2 for values from (21) to (31);
insert into prtx1 select 1 + i%30, i, i
from generate_series(1, 95 * 1000) i;
insert into prtx2 select 1 + i%30, i, i
- from generate_series(1, 95 * 500) i, generate_series(1, 95 * 10) j;
+ from generate_series(1, 500) i, generate_series(1, 95 * 10) j;
create index on prtx2 (b);
create index on prtx2 (c);
analyze prtx1;
diff --git a/src/test/regress/sql/partition_prune.sql b/src/test/regress/sql/partition_prune.sql
index 82ac39d5dc8..bef0a891ade 100644
--- a/src/test/regress/sql/partition_prune.sql
+++ b/src/test/regress/sql/partition_prune.sql
@@ -1274,9 +1274,9 @@ select
case c when 0 then null else 3 end,
case d when 0 then null else 4 end
from
- generate_series(0, 95 * 1) a,
- generate_series(0, 95 * 1) b,
- generate_series(0, 95 * 1) c,
+ generate_series(0, 1) a,
+ generate_series(0, 1) b,
+ generate_series(0, 1) c,
generate_series(0, 95 * 1) d;
-- Ensure partition pruning works correctly for each combination of IS NULL
diff --git a/src/test/regress/sql/polygon.sql b/src/test/regress/sql/polygon.sql
index d39a2b4e8f8..2d862985510 100644
--- a/src/test/regress/sql/polygon.sql
+++ b/src/test/regress/sql/polygon.sql
@@ -42,7 +42,7 @@ CREATE TABLE quad_poly_tbl (id int, p polygon);
INSERT INTO quad_poly_tbl
SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10))
- FROM generate_series(1, 95 * 100) x,
+ FROM generate_series(1, 100) x,
generate_series(1, 95 * 100) y;
INSERT INTO quad_poly_tbl
diff --git a/src/test/regress/sql/rangetypes.sql b/src/test/regress/sql/rangetypes.sql
index b51d6c405c2..4138418c7a6 100644
--- a/src/test/regress/sql/rangetypes.sql
+++ b/src/test/regress/sql/rangetypes.sql
@@ -314,13 +314,13 @@ select count(*) from test_range_gist where ir -|- int4multirange(int4range(100,2
create table test_range_spgist(ir int4range);
create index test_range_spgist_idx on test_range_spgist using spgist (ir);
-insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 95 * 2000) g;
-insert into test_range_spgist select 'empty'::int4range from generate_series(1, 95 * 500) g;
-insert into test_range_spgist select int4range(g, g+10000) from generate_series(1, 95 * 1000) g;
-insert into test_range_spgist select 'empty'::int4range from generate_series(1, 95 * 500) g;
-insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1, 95 * 100) g;
-insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1, 95 * 100) g;
-insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 95 * 2000) g;
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 0.1 * 95 * 2000) g;
+insert into test_range_spgist select 'empty'::int4range from generate_series(1, 0.1 * 95 * 500) g;
+insert into test_range_spgist select int4range(g, g+10000) from generate_series(1, 0.1 * 95 * 1000) g;
+insert into test_range_spgist select 'empty'::int4range from generate_series(1, 0.1 * 95 * 500) g;
+insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1, 0.1 * 95 * 100) g;
+insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1, 0.1 * 95 * 100) g;
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 0.1 * 95 * 2000) g;
-- first, verify non-indexed results
SET enable_seqscan = t;
diff --git a/src/test/regress/sql/spgist.sql b/src/test/regress/sql/spgist.sql
index 0c4f24e1d49..61e53375539 100644
--- a/src/test/regress/sql/spgist.sql
+++ b/src/test/regress/sql/spgist.sql
@@ -16,9 +16,9 @@ vacuum spgist_point_tbl;
-- Insert more data, to make the index a few levels deep.
insert into spgist_point_tbl (id, p)
-select g, point(g*10, g*10) from generate_series(1, 95 * 10000) g;
+select g, point(g*10, g*10) from generate_series(1, 0.1 * 95 * 10000) g;
insert into spgist_point_tbl (id, p)
-select g+100000, point(g*10+1, g*10+1) from generate_series(1, 95 * 10000) g;
+select g+100000, point(g*10+1, g*10+1) from generate_series(1, 0.1 * 95 * 10000) g;
-- To test vacuum, delete some entries from all over the index.
delete from spgist_point_tbl where id % 2 = 1;
@@ -37,8 +37,8 @@ vacuum spgist_point_tbl;
create table spgist_box_tbl(id serial, b box);
insert into spgist_box_tbl(b)
select box(point(i,j),point(i+s,j+s))
- from generate_series(1, 95 * 100,5) i,
- generate_series(1, 95 * 100,5) j,
+ from generate_series(1,100,5) i,
+ generate_series(1,100,5) j,
generate_series(1, 95 * 10) s;
create index spgist_box_idx on spgist_box_tbl using spgist (b);
@@ -86,6 +86,6 @@ create unlogged table spgist_unlogged_tbl(id serial, b box);
create index spgist_unlogged_idx on spgist_unlogged_tbl using spgist (b);
insert into spgist_unlogged_tbl(b)
select box(point(i,j))
- from generate_series(1, 95 * 100,5) i,
+ from generate_series(1,100,5) i,
generate_series(1, 95 * 10,5) j;
-- leave this table around, to help in testing dump/restore
diff --git a/src/test/regress/sql/tuplesort.sql b/src/test/regress/sql/tuplesort.sql
index fa762f26ac7..7a1fd619eba 100644
--- a/src/test/regress/sql/tuplesort.sql
+++ b/src/test/regress/sql/tuplesort.sql
@@ -276,7 +276,7 @@ ROLLBACK;
CREATE TEMP TABLE test_mark_restore(col1 int, col2 int, col12 int);
-- need a few duplicates for mark/restore to matter
INSERT INTO test_mark_restore(col1, col2, col12)
- SELECT a.i, b.i, a.i * b.i FROM generate_series(1, 95 * 500) a(i), generate_series(1, 95 * 5) b(i);
+ SELECT a.i, b.i, a.i * b.i FROM generate_series(1, 500) a(i), generate_series(1, 95 * 5) b(i);
BEGIN;

View File

@@ -0,0 +1,593 @@
diff --git a/src/test/regress/sql/box.sql b/src/test/regress/sql/box.sql
index 249636c76c3..540c2b54dda 100644
--- a/src/test/regress/sql/box.sql
+++ b/src/test/regress/sql/box.sql
@@ -196,7 +196,7 @@ CREATE TABLE quad_box_tbl (id int, b box);
INSERT INTO quad_box_tbl
SELECT (x - 1) * 100 + y, box(point(x * 10, y * 10), point(x * 10 + 5, y * 10 + 5))
- FROM generate_series(1, 95 * 100) x,
+ FROM generate_series(1, 100) x,
generate_series(1, 95 * 100) y;
-- insert repeating data to test allTheSame
diff --git a/src/test/regress/sql/brin.sql b/src/test/regress/sql/brin.sql
index 39d3cd7821a..86efbb72609 100644
--- a/src/test/regress/sql/brin.sql
+++ b/src/test/regress/sql/brin.sql
@@ -476,7 +476,7 @@ CREATE TABLE brintest_3 (a text, b text, c text, d text);
-- long random strings (~2000 chars each, so ~6kB for min/max on two
-- columns) to trigger toasting
-WITH rand_value AS (SELECT string_agg(fipshash(i::text),'') AS val FROM generate_series(1, 95 * 60) s(i))
+WITH rand_value AS (SELECT string_agg(fipshash(i::text),'') AS val FROM generate_series(1,60) s(i))
INSERT INTO brintest_3
SELECT val, val, val, val FROM rand_value;
@@ -495,7 +495,7 @@ VACUUM brintest_3;
-- retry insert with a different random-looking (but deterministic) value
-- the value is different, and so should replace either min or max in the
-- brin summary
-WITH rand_value AS (SELECT string_agg(fipshash((-i)::text),'') AS val FROM generate_series(1, 95 * 60) s(i))
+WITH rand_value AS (SELECT string_agg(fipshash((-i)::text),'') AS val FROM generate_series(1,60) s(i))
INSERT INTO brintest_3
SELECT val, val, val, val FROM rand_value;
diff --git a/src/test/regress/sql/brin_multi.sql b/src/test/regress/sql/brin_multi.sql
index b7f7a9e8803..b1a109fe07f 100644
--- a/src/test/regress/sql/brin_multi.sql
+++ b/src/test/regress/sql/brin_multi.sql
@@ -612,7 +612,7 @@ CREATE TABLE brin_date_test(a DATE);
INSERT INTO brin_date_test SELECT '4713-01-01 BC'::date + i FROM generate_series(1, 95 * 30) s(i);
-- insert values close to date minimum
-INSERT INTO brin_date_test SELECT '5874897-12-01'::date + i FROM generate_series(1, 95 * 30) s(i);
+INSERT INTO brin_date_test SELECT '5874897-12-01'::date + i FROM generate_series(1, 30) s(i);
CREATE INDEX ON brin_date_test USING brin (a date_minmax_multi_ops) WITH (pages_per_range=1);
diff --git a/src/test/regress/sql/btree_index.sql b/src/test/regress/sql/btree_index.sql
index d0d86db1667..88a752264a0 100644
--- a/src/test/regress/sql/btree_index.sql
+++ b/src/test/regress/sql/btree_index.sql
@@ -267,7 +267,7 @@ VACUUM delete_test_table;
--
-- The vacuum above should've turned the leaf page into a fast root. We just
-- need to insert some rows to cause the fast root page to split.
-INSERT INTO delete_test_table SELECT i, 1, 2, 3 FROM generate_series(1, 95 * 1000) i;
+INSERT INTO delete_test_table SELECT i, 1, 2, 3 FROM generate_series(1,1000) i;
-- Test unsupported btree opclass parameters
create index on btree_tall_tbl (id int4_ops(foo=1));
diff --git a/src/test/regress/sql/create_table.sql b/src/test/regress/sql/create_table.sql
index 13006372064..1fd4cbfa7ef 100644
--- a/src/test/regress/sql/create_table.sql
+++ b/src/test/regress/sql/create_table.sql
@@ -47,7 +47,7 @@ DEALLOCATE select1;
-- (temporarily hide query, to avoid the long CREATE TABLE stmt)
\set ECHO none
SELECT 'CREATE TABLE extra_wide_table(firstc text, '|| array_to_string(array_agg('c'||i||' bool'),',')||', lastc text);'
-FROM generate_series(1, 95 * 1100) g(i)
+FROM generate_series(1, 1100) g(i)
\gexec
\set ECHO all
INSERT INTO extra_wide_table(firstc, lastc) VALUES('first col', 'last col');
@@ -74,7 +74,7 @@ CREATE TABLE default_expr_agg (a int DEFAULT (avg(1)));
-- invalid use of subquery
CREATE TABLE default_expr_agg (a int DEFAULT (select 1));
-- invalid use of set-returning function
-CREATE TABLE default_expr_agg (a int DEFAULT (generate_series(1, 95 * 3)));
+CREATE TABLE default_expr_agg (a int DEFAULT (generate_series(1,3)));
-- Verify that subtransaction rollback restores rd_createSubid.
BEGIN;
@@ -359,7 +359,7 @@ CREATE TABLE part_bogus_expr_fail PARTITION OF range_parted
CREATE TABLE part_bogus_expr_fail PARTITION OF range_parted
FOR VALUES FROM ((select 1)) TO ('2019-01-01');
CREATE TABLE part_bogus_expr_fail PARTITION OF range_parted
- FOR VALUES FROM (generate_series(1, 95 * 3)) TO ('2019-01-01');
+ FOR VALUES FROM (generate_series(1, 3)) TO ('2019-01-01');
-- trying to specify list for range partitioned table
CREATE TABLE fail_part PARTITION OF range_parted FOR VALUES IN ('a');
diff --git a/src/test/regress/sql/fast_default.sql b/src/test/regress/sql/fast_default.sql
index 28fefad6fe6..7d7060820e4 100644
--- a/src/test/regress/sql/fast_default.sql
+++ b/src/test/regress/sql/fast_default.sql
@@ -318,7 +318,7 @@ CREATE TABLE T (pk INT NOT NULL PRIMARY KEY);
SELECT set('t');
-INSERT INTO T SELECT * FROM generate_series(1, 95 * 10) a;
+INSERT INTO T SELECT * FROM generate_series(1, 10) a;
ALTER TABLE T ADD COLUMN c_bigint BIGINT NOT NULL DEFAULT -1;
@@ -326,7 +326,7 @@ INSERT INTO T SELECT b, b - 10 FROM generate_series(11, 20) a(b);
ALTER TABLE T ADD COLUMN c_text TEXT DEFAULT 'hello';
-INSERT INTO T SELECT b, b - 10, (b + 10)::text FROM generate_series(21, 30) a(b);
+INSERT INTO T SELECT b, b - 10, (b + 10)::text FROM generate_series(21, 95 * 30) a(b);
-- WHERE clause
SELECT c_bigint, c_text FROM T WHERE c_bigint = -1 LIMIT 1;
diff --git a/src/test/regress/sql/hash_index.sql b/src/test/regress/sql/hash_index.sql
index fcd5f91a39f..6ac90c57730 100644
--- a/src/test/regress/sql/hash_index.sql
+++ b/src/test/regress/sql/hash_index.sql
@@ -220,7 +220,7 @@ SELECT h.seqno AS f20000
CREATE TABLE hash_split_heap (keycol INT);
INSERT INTO hash_split_heap SELECT 1 FROM generate_series(1, 95 * 500) a;
CREATE INDEX hash_split_index on hash_split_heap USING HASH (keycol);
-INSERT INTO hash_split_heap SELECT 1 FROM generate_series(1, 95 * 5000) a;
+INSERT INTO hash_split_heap SELECT 1 FROM generate_series(1, POW(95, 0.5) * 5000) a;
-- Let's do a backward scan.
BEGIN;
@@ -236,7 +236,7 @@ END;
-- DELETE, INSERT, VACUUM.
DELETE FROM hash_split_heap WHERE keycol = 1;
-INSERT INTO hash_split_heap SELECT a/2 FROM generate_series(1, 95 * 25000) a;
+INSERT INTO hash_split_heap SELECT a/2 FROM generate_series(1, POW(95, 0.5) * 25000) a;
VACUUM hash_split_heap;
diff --git a/src/test/regress/sql/horology.sql b/src/test/regress/sql/horology.sql
index 3920a9528ae..d6ce372d799 100644
--- a/src/test/regress/sql/horology.sql
+++ b/src/test/regress/sql/horology.sql
@@ -551,14 +551,14 @@ SELECT to_timestamp('2011-12-18 11:38 +01:xyz', 'YYYY-MM-DD HH12:MI OF'); -- er
SELECT to_timestamp('2018-11-02 12:34:56.025', 'YYYY-MM-DD HH24:MI:SS.MS');
SELECT i, to_timestamp('2018-11-02 12:34:56', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
-SELECT i, to_timestamp('2018-11-02 12:34:56.1', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
-SELECT i, to_timestamp('2018-11-02 12:34:56.12', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
-SELECT i, to_timestamp('2018-11-02 12:34:56.123', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
-SELECT i, to_timestamp('2018-11-02 12:34:56.1234', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
-SELECT i, to_timestamp('2018-11-02 12:34:56.12345', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
+SELECT i, to_timestamp('2018-11-02 12:34:56.1', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
+SELECT i, to_timestamp('2018-11-02 12:34:56.12', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
+SELECT i, to_timestamp('2018-11-02 12:34:56.123', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
+SELECT i, to_timestamp('2018-11-02 12:34:56.1234', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
+SELECT i, to_timestamp('2018-11-02 12:34:56.12345', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
SELECT i, to_timestamp('2018-11-02 12:34:56.123456', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
SELECT i, to_timestamp('2018-11-02 12:34:56.123456789', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
-SELECT i, to_timestamp('20181102123456123456', 'YYYYMMDDHH24MISSFF' || i) FROM generate_series(1, 95 * 6) i;
+SELECT i, to_timestamp('20181102123456123456', 'YYYYMMDDHH24MISSFF' || i) FROM generate_series(1, 6) i;
SELECT to_date('1 4 1902', 'Q MM YYYY'); -- Q is ignored
SELECT to_date('3 4 21 01', 'W MM CC YY');
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
index 96c19fa5297..276f6d25c67 100644
--- a/src/test/regress/sql/inherit.sql
+++ b/src/test/regress/sql/inherit.sql
@@ -742,7 +742,7 @@ create table inhcld1(f2 name, f1 int primary key);
create table inhcld2(f1 int primary key, f2 name);
alter table inhpar attach partition inhcld1 for values from (1) to (5);
alter table inhpar attach partition inhcld2 for values from (5) to (100);
-insert into inhpar select x, x::text from generate_series(1, 95 * 10) x;
+insert into inhpar select x, x::text from generate_series(1,10) x;
explain (verbose, costs off)
update inhpar i set (f1, f2) = (select i.f1, i.f2 || '-' from int4_tbl limit 1);
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index c9fdd126d15..bbbda3d6237 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -320,8 +320,8 @@ create table part_ee_ff3_2 partition of part_ee_ff3 for values from (25) to (30)
truncate list_parted;
insert into list_parted values ('aa'), ('cc');
-insert into list_parted select 'Ff', s.a from generate_series(1, 95 * 29) s(a);
-insert into list_parted select 'gg', s.a from generate_series(1, 95 * 9) s(a);
+insert into list_parted select 'Ff', s.a from generate_series(1, 29) s(a);
+insert into list_parted select 'gg', s.a from generate_series(1, 9) s(a);
insert into list_parted (b) values (1);
select tableoid::regclass::text, a, min(b) as min_b, max(b) as max_b from list_parted group by 1, 2 order by 1;
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 47abc031c0f..34c4d8c1312 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -310,9 +310,9 @@ rollback to settings;
-- Exercise rescans. We'll turn off parallel_leader_participation so
-- that we can check that instrumentation comes back correctly.
-create table join_foo as select generate_series(1, 95 * 3) as id, 'xxxxx'::text as t;
+create table join_foo as select generate_series(1, POW(95, 0.5) * 3) as id, 'xxxxx'::text as t;
alter table join_foo set (parallel_workers = 0);
-create table join_bar as select generate_series(1, 95 * 10000) as id, 'xxxxx'::text as t;
+create table join_bar as select generate_series(1, POW(95, 0.5) * 10000) as id, 'xxxxx'::text as t;
alter table join_bar set (parallel_workers = 2);
-- multi-batch with rescan, parallel-oblivious
diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql
index b60271d9400..7d89c85179f 100644
--- a/src/test/regress/sql/merge.sql
+++ b/src/test/regress/sql/merge.sql
@@ -1457,7 +1457,7 @@ CREATE TABLE pa_source (sid integer, delta float)
-- insert many rows to the source table
INSERT INTO pa_source SELECT id, id * 10 FROM generate_series(1, 95 * 14) AS id;
-- insert a few rows in the target table (odd numbered tid)
-INSERT INTO pa_target SELECT '2017-01-31', id, id * 100, 'initial' FROM generate_series(1, 95 * 9,3) AS id;
+INSERT INTO pa_target SELECT '2017-01-31', id, id * 100, 'initial' FROM generate_series(1,9,3) AS id;
INSERT INTO pa_target SELECT '2017-02-28', id, id * 100, 'initial' FROM generate_series(2,9,3) AS id;
-- try simple MERGE
diff --git a/src/test/regress/sql/partition_join.sql b/src/test/regress/sql/partition_join.sql
index 53a9b26d4c4..0c48dd2be78 100644
--- a/src/test/regress/sql/partition_join.sql
+++ b/src/test/regress/sql/partition_join.sql
@@ -13,7 +13,7 @@ CREATE TABLE prt1 (a int, b int, c varchar) PARTITION BY RANGE(a);
CREATE TABLE prt1_p1 PARTITION OF prt1 FOR VALUES FROM (0) TO (250);
CREATE TABLE prt1_p3 PARTITION OF prt1 FOR VALUES FROM (500) TO (600);
CREATE TABLE prt1_p2 PARTITION OF prt1 FOR VALUES FROM (250) TO (500);
-INSERT INTO prt1 SELECT i, i % 25, to_char(i, 'FM0000') FROM generate_series(0, 95 * 599) i WHERE i % 2 = 0;
+INSERT INTO prt1 SELECT i, i % 25, to_char(i, 'FM0000') FROM generate_series(0,599) i WHERE i % 2 = 0;
CREATE INDEX iprt1_p1_a on prt1_p1(a);
CREATE INDEX iprt1_p2_a on prt1_p2(a);
CREATE INDEX iprt1_p3_a on prt1_p3(a);
@@ -23,7 +23,7 @@ CREATE TABLE prt2 (a int, b int, c varchar) PARTITION BY RANGE(b);
CREATE TABLE prt2_p1 PARTITION OF prt2 FOR VALUES FROM (0) TO (250);
CREATE TABLE prt2_p2 PARTITION OF prt2 FOR VALUES FROM (250) TO (500);
CREATE TABLE prt2_p3 PARTITION OF prt2 FOR VALUES FROM (500) TO (600);
-INSERT INTO prt2 SELECT i % 25, i, to_char(i, 'FM0000') FROM generate_series(0, 95 * 599) i WHERE i % 3 = 0;
+INSERT INTO prt2 SELECT i % 25, i, to_char(i, 'FM0000') FROM generate_series(0,599) i WHERE i % 3 = 0;
CREATE INDEX iprt2_p1_b on prt2_p1(b);
CREATE INDEX iprt2_p2_b on prt2_p2(b);
CREATE INDEX iprt2_p3_b on prt2_p3(b);
@@ -149,7 +149,7 @@ CREATE TABLE prt1_e (a int, b int, c int) PARTITION BY RANGE(((a + b)/2));
CREATE TABLE prt1_e_p1 PARTITION OF prt1_e FOR VALUES FROM (0) TO (250);
CREATE TABLE prt1_e_p2 PARTITION OF prt1_e FOR VALUES FROM (250) TO (500);
CREATE TABLE prt1_e_p3 PARTITION OF prt1_e FOR VALUES FROM (500) TO (600);
-INSERT INTO prt1_e SELECT i, i, i % 25 FROM generate_series(0, 95 * 599, 2) i;
+INSERT INTO prt1_e SELECT i, i, i % 25 FROM generate_series(0, 599, 2) i;
CREATE INDEX iprt1_e_p1_ab2 on prt1_e_p1(((a+b)/2));
CREATE INDEX iprt1_e_p2_ab2 on prt1_e_p2(((a+b)/2));
CREATE INDEX iprt1_e_p3_ab2 on prt1_e_p3(((a+b)/2));
@@ -159,7 +159,7 @@ CREATE TABLE prt2_e (a int, b int, c int) PARTITION BY RANGE(((b + a)/2));
CREATE TABLE prt2_e_p1 PARTITION OF prt2_e FOR VALUES FROM (0) TO (250);
CREATE TABLE prt2_e_p2 PARTITION OF prt2_e FOR VALUES FROM (250) TO (500);
CREATE TABLE prt2_e_p3 PARTITION OF prt2_e FOR VALUES FROM (500) TO (600);
-INSERT INTO prt2_e SELECT i, i, i % 25 FROM generate_series(0, 95 * 599, 3) i;
+INSERT INTO prt2_e SELECT i, i, i % 25 FROM generate_series(0, 599, 3) i;
ANALYZE prt2_e;
EXPLAIN (COSTS OFF)
@@ -248,14 +248,14 @@ CREATE TABLE prt1_m (a int, b int, c int) PARTITION BY RANGE(a, ((a + b)/2));
CREATE TABLE prt1_m_p1 PARTITION OF prt1_m FOR VALUES FROM (0, 0) TO (250, 250);
CREATE TABLE prt1_m_p2 PARTITION OF prt1_m FOR VALUES FROM (250, 250) TO (500, 500);
CREATE TABLE prt1_m_p3 PARTITION OF prt1_m FOR VALUES FROM (500, 500) TO (600, 600);
-INSERT INTO prt1_m SELECT i, i, i % 25 FROM generate_series(0, 95 * 599, 2) i;
+INSERT INTO prt1_m SELECT i, i, i % 25 FROM generate_series(0, 599, 2) i;
ANALYZE prt1_m;
CREATE TABLE prt2_m (a int, b int, c int) PARTITION BY RANGE(((b + a)/2), b);
CREATE TABLE prt2_m_p1 PARTITION OF prt2_m FOR VALUES FROM (0, 0) TO (250, 250);
CREATE TABLE prt2_m_p2 PARTITION OF prt2_m FOR VALUES FROM (250, 250) TO (500, 500);
CREATE TABLE prt2_m_p3 PARTITION OF prt2_m FOR VALUES FROM (500, 500) TO (600, 600);
-INSERT INTO prt2_m SELECT i, i, i % 25 FROM generate_series(0, 95 * 599, 3) i;
+INSERT INTO prt2_m SELECT i, i, i % 25 FROM generate_series(0, 599, 3) i;
ANALYZE prt2_m;
EXPLAIN (COSTS OFF)
@@ -269,14 +269,14 @@ CREATE TABLE plt1 (a int, b int, c text) PARTITION BY LIST(c);
CREATE TABLE plt1_p1 PARTITION OF plt1 FOR VALUES IN ('0000', '0003', '0004', '0010');
CREATE TABLE plt1_p2 PARTITION OF plt1 FOR VALUES IN ('0001', '0005', '0002', '0009');
CREATE TABLE plt1_p3 PARTITION OF plt1 FOR VALUES IN ('0006', '0007', '0008', '0011');
-INSERT INTO plt1 SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
+INSERT INTO plt1 SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 599, 2) i;
ANALYZE plt1;
CREATE TABLE plt2 (a int, b int, c text) PARTITION BY LIST(c);
CREATE TABLE plt2_p1 PARTITION OF plt2 FOR VALUES IN ('0000', '0003', '0004', '0010');
CREATE TABLE plt2_p2 PARTITION OF plt2 FOR VALUES IN ('0001', '0005', '0002', '0009');
CREATE TABLE plt2_p3 PARTITION OF plt2 FOR VALUES IN ('0006', '0007', '0008', '0011');
-INSERT INTO plt2 SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 3) i;
+INSERT INTO plt2 SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 599, 3) i;
ANALYZE plt2;
--
@@ -286,7 +286,7 @@ CREATE TABLE plt1_e (a int, b int, c text) PARTITION BY LIST(ltrim(c, 'A'));
CREATE TABLE plt1_e_p1 PARTITION OF plt1_e FOR VALUES IN ('0000', '0003', '0004', '0010');
CREATE TABLE plt1_e_p2 PARTITION OF plt1_e FOR VALUES IN ('0001', '0005', '0002', '0009');
CREATE TABLE plt1_e_p3 PARTITION OF plt1_e FOR VALUES IN ('0006', '0007', '0008', '0011');
-INSERT INTO plt1_e SELECT i, i, 'A' || to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
+INSERT INTO plt1_e SELECT i, i, 'A' || to_char(i/50, 'FM0000') FROM generate_series(0, 599, 2) i;
ANALYZE plt1_e;
-- test partition matching with N-way join
@@ -371,7 +371,7 @@ CREATE TABLE prt1_l_p2_p2 PARTITION OF prt1_l_p2 FOR VALUES IN ('0002', '0003');
CREATE TABLE prt1_l_p3 PARTITION OF prt1_l FOR VALUES FROM (500) TO (600) PARTITION BY RANGE (b);
CREATE TABLE prt1_l_p3_p1 PARTITION OF prt1_l_p3 FOR VALUES FROM (0) TO (13);
CREATE TABLE prt1_l_p3_p2 PARTITION OF prt1_l_p3 FOR VALUES FROM (13) TO (25);
-INSERT INTO prt1_l SELECT i, i % 25, to_char(i % 4, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
+INSERT INTO prt1_l SELECT i, i % 25, to_char(i % 4, 'FM0000') FROM generate_series(0, 599, 2) i;
ANALYZE prt1_l;
CREATE TABLE prt2_l (a int, b int, c varchar) PARTITION BY RANGE(b);
@@ -382,7 +382,7 @@ CREATE TABLE prt2_l_p2_p2 PARTITION OF prt2_l_p2 FOR VALUES IN ('0002', '0003');
CREATE TABLE prt2_l_p3 PARTITION OF prt2_l FOR VALUES FROM (500) TO (600) PARTITION BY RANGE (a);
CREATE TABLE prt2_l_p3_p1 PARTITION OF prt2_l_p3 FOR VALUES FROM (0) TO (13);
CREATE TABLE prt2_l_p3_p2 PARTITION OF prt2_l_p3 FOR VALUES FROM (13) TO (25);
-INSERT INTO prt2_l SELECT i % 25, i, to_char(i % 4, 'FM0000') FROM generate_series(0, 95 * 599, 3) i;
+INSERT INTO prt2_l SELECT i % 25, i, to_char(i % 4, 'FM0000') FROM generate_series(0, 599, 3) i;
ANALYZE prt2_l;
-- inner join, qual covering only top-level partitions
@@ -453,27 +453,27 @@ WHERE EXISTS (
CREATE TABLE prt1_n (a int, b int, c varchar) PARTITION BY RANGE(c);
CREATE TABLE prt1_n_p1 PARTITION OF prt1_n FOR VALUES FROM ('0000') TO ('0250');
CREATE TABLE prt1_n_p2 PARTITION OF prt1_n FOR VALUES FROM ('0250') TO ('0500');
-INSERT INTO prt1_n SELECT i, i, to_char(i, 'FM0000') FROM generate_series(0, 95 * 499, 2) i;
+INSERT INTO prt1_n SELECT i, i, to_char(i, 'FM0000') FROM generate_series(0, 499, 2) i;
ANALYZE prt1_n;
CREATE TABLE prt2_n (a int, b int, c text) PARTITION BY LIST(c);
CREATE TABLE prt2_n_p1 PARTITION OF prt2_n FOR VALUES IN ('0000', '0003', '0004', '0010', '0006', '0007');
CREATE TABLE prt2_n_p2 PARTITION OF prt2_n FOR VALUES IN ('0001', '0005', '0002', '0009', '0008', '0011');
-INSERT INTO prt2_n SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
+INSERT INTO prt2_n SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 599, 2) i;
ANALYZE prt2_n;
CREATE TABLE prt3_n (a int, b int, c text) PARTITION BY LIST(c);
CREATE TABLE prt3_n_p1 PARTITION OF prt3_n FOR VALUES IN ('0000', '0004', '0006', '0007');
CREATE TABLE prt3_n_p2 PARTITION OF prt3_n FOR VALUES IN ('0001', '0002', '0008', '0010');
CREATE TABLE prt3_n_p3 PARTITION OF prt3_n FOR VALUES IN ('0003', '0005', '0009', '0011');
-INSERT INTO prt2_n SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
+INSERT INTO prt2_n SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 599, 2) i;
ANALYZE prt3_n;
CREATE TABLE prt4_n (a int, b int, c text) PARTITION BY RANGE(a);
CREATE TABLE prt4_n_p1 PARTITION OF prt4_n FOR VALUES FROM (0) TO (300);
CREATE TABLE prt4_n_p2 PARTITION OF prt4_n FOR VALUES FROM (300) TO (500);
CREATE TABLE prt4_n_p3 PARTITION OF prt4_n FOR VALUES FROM (500) TO (600);
-INSERT INTO prt4_n SELECT i, i, to_char(i, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
+INSERT INTO prt4_n SELECT i, i, to_char(i, 'FM0000') FROM generate_series(0, 599, 2) i;
ANALYZE prt4_n;
-- partitionwise join can not be applied if the partition ranges differ
@@ -533,7 +533,7 @@ create temp table prtx2_3 partition of prtx2 for values from (21) to (31);
insert into prtx1 select 1 + i%30, i, i
from generate_series(1, 95 * 1000) i;
insert into prtx2 select 1 + i%30, i, i
- from generate_series(1, 95 * 500) i, generate_series(1, 95 * 10) j;
+ from generate_series(1, 500) i, generate_series(1, 95 * 10) j;
create index on prtx2 (b);
create index on prtx2 (c);
analyze prtx1;
@@ -1202,7 +1202,7 @@ CREATE TABLE fract_t0 PARTITION OF fract_t FOR VALUES FROM ('0') TO ('1000');
CREATE TABLE fract_t1 PARTITION OF fract_t FOR VALUES FROM ('1000') TO ('2000');
-- insert data
-INSERT INTO fract_t (id) (SELECT generate_series(0, 95 * 1999));
+INSERT INTO fract_t (id) (SELECT generate_series(0, 1999));
ANALYZE fract_t;
-- verify plan; nested index only scans
diff --git a/src/test/regress/sql/partition_prune.sql b/src/test/regress/sql/partition_prune.sql
index 82ac39d5dc8..6a0c7a3666d 100644
--- a/src/test/regress/sql/partition_prune.sql
+++ b/src/test/regress/sql/partition_prune.sql
@@ -512,7 +512,7 @@ create table list_part2 partition of list_part for values in (2);
create table list_part3 partition of list_part for values in (3);
create table list_part4 partition of list_part for values in (4);
-insert into list_part select generate_series(1, 95 * 4);
+insert into list_part select generate_series(1, 4);
begin;
@@ -940,7 +940,7 @@ create table ma_test (a int, b int) partition by range (a);
create table ma_test_p1 partition of ma_test for values from (0) to (10);
create table ma_test_p2 partition of ma_test for values from (10) to (20);
create table ma_test_p3 partition of ma_test for values from (20) to (30);
-insert into ma_test select x,x from generate_series(0, 95 * 29) t(x);
+insert into ma_test select x,x from generate_series(0,29) t(x);
create index on ma_test (b);
analyze ma_test;
@@ -1263,7 +1263,7 @@ create table hp_prefix_test (a int, b int, c int, d int)
-- create 8 partitions
select 'create table hp_prefix_test_p' || x::text || ' partition of hp_prefix_test for values with (modulus 8, remainder ' || x::text || ');'
-from generate_series(0, 95 * 7) x;
+from generate_series(0, 7) x;
\gexec
-- insert 16 rows, one row for each test to perform.
@@ -1274,9 +1274,9 @@ select
case c when 0 then null else 3 end,
case d when 0 then null else 4 end
from
- generate_series(0, 95 * 1) a,
- generate_series(0, 95 * 1) b,
- generate_series(0, 95 * 1) c,
+ generate_series(0, 1) a,
+ generate_series(0, 1) b,
+ generate_series(0, 1) c,
generate_series(0, 95 * 1) d;
-- Ensure partition pruning works correctly for each combination of IS NULL
diff --git a/src/test/regress/sql/plpgsql.sql b/src/test/regress/sql/plpgsql.sql
index d18cc331561..435d3d718e1 100644
--- a/src/test/regress/sql/plpgsql.sql
+++ b/src/test/regress/sql/plpgsql.sql
@@ -4581,12 +4581,12 @@ CREATE TRIGGER transition_table_level2_ri_child_upd_trigger
-- create initial test data
INSERT INTO transition_table_level1 (level1_no)
- SELECT generate_series(1, 95 * 200);
+ SELECT generate_series(1,200);
ANALYZE transition_table_level1;
INSERT INTO transition_table_level2 (level2_no, parent_no)
SELECT level2_no, level2_no / 50 + 1 AS parent_no
- FROM generate_series(1, 95 * 9999) level2_no;
+ FROM generate_series(1,9999) level2_no;
ANALYZE transition_table_level2;
INSERT INTO transition_table_status (level, node_no, status)
diff --git a/src/test/regress/sql/polygon.sql b/src/test/regress/sql/polygon.sql
index d39a2b4e8f8..2d862985510 100644
--- a/src/test/regress/sql/polygon.sql
+++ b/src/test/regress/sql/polygon.sql
@@ -42,7 +42,7 @@ CREATE TABLE quad_poly_tbl (id int, p polygon);
INSERT INTO quad_poly_tbl
SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10))
- FROM generate_series(1, 95 * 100) x,
+ FROM generate_series(1, 100) x,
generate_series(1, 95 * 100) y;
INSERT INTO quad_poly_tbl
diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql
index 12c40039b18..e08b0aee00e 100644
--- a/src/test/regress/sql/psql.sql
+++ b/src/test/regress/sql/psql.sql
@@ -187,7 +187,7 @@ select 'drop table gexec_test', 'select ''2000-01-01''::date as party_over'
prepare q as select array_to_string(array_agg(repeat('x',2*n)),E'\n') as "ab
c", array_to_string(array_agg(repeat('y',20-2*n)),E'\n') as "a
-bc" from generate_series(1, 95 * 10) as n(n) group by n>1 order by n>1;
+bc" from generate_series(1,10) as n(n) group by n>1 order by n>1;
\pset linestyle ascii
@@ -304,7 +304,7 @@ execute q;
deallocate q;
-- test single-line header and data
-prepare q as select repeat('x',2*n) as "0123456789abcdef", repeat('y',20-2*n) as "0123456789" from generate_series(1, 95 * 10) as n;
+prepare q as select repeat('x',2*n) as "0123456789abcdef", repeat('y',20-2*n) as "0123456789" from generate_series(1,10) as n;
\pset linestyle ascii
@@ -1220,7 +1220,7 @@ create table child_10_20 partition of parent_tab
for values from (10) to (20);
create table child_20_30 partition of parent_tab
for values from (20) to (30);
-insert into parent_tab values (generate_series(0, 95 * 29));
+insert into parent_tab values (generate_series(0,29));
create table child_30_40 partition of parent_tab
for values from (30) to (40)
partition by range(id);
diff --git a/src/test/regress/sql/rangetypes.sql b/src/test/regress/sql/rangetypes.sql
index b51d6c405c2..a2d50d7bb43 100644
--- a/src/test/regress/sql/rangetypes.sql
+++ b/src/test/regress/sql/rangetypes.sql
@@ -314,13 +314,13 @@ select count(*) from test_range_gist where ir -|- int4multirange(int4range(100,2
create table test_range_spgist(ir int4range);
create index test_range_spgist_idx on test_range_spgist using spgist (ir);
-insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 95 * 2000) g;
-insert into test_range_spgist select 'empty'::int4range from generate_series(1, 95 * 500) g;
-insert into test_range_spgist select int4range(g, g+10000) from generate_series(1, 95 * 1000) g;
-insert into test_range_spgist select 'empty'::int4range from generate_series(1, 95 * 500) g;
-insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1, 95 * 100) g;
-insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1, 95 * 100) g;
-insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 95 * 2000) g;
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1, POW(95, 0.5)::int * 2000) g;
+insert into test_range_spgist select 'empty'::int4range from generate_series(1, POW(95, 0.5)::int * 500) g;
+insert into test_range_spgist select int4range(g, g+10000) from generate_series(1, POW(95, 0.5)::int * 1000) g;
+insert into test_range_spgist select 'empty'::int4range from generate_series(1, POW(95, 0.5)::int * 500) g;
+insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1, POW(95, 0.5)::int * 100) g;
+insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1, POW(95, 0.5)::int * 100) g;
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1, POW(95, 0.5)::int * 2000) g;
-- first, verify non-indexed results
SET enable_seqscan = t;
diff --git a/src/test/regress/sql/spgist.sql b/src/test/regress/sql/spgist.sql
index 0c4f24e1d49..ed9f7c45411 100644
--- a/src/test/regress/sql/spgist.sql
+++ b/src/test/regress/sql/spgist.sql
@@ -16,9 +16,9 @@ vacuum spgist_point_tbl;
-- Insert more data, to make the index a few levels deep.
insert into spgist_point_tbl (id, p)
-select g, point(g*10, g*10) from generate_series(1, 95 * 10000) g;
+select g, point(g*10, g*10) from generate_series(1, POW(95, 0.5) * 10000) g;
insert into spgist_point_tbl (id, p)
-select g+100000, point(g*10+1, g*10+1) from generate_series(1, 95 * 10000) g;
+select g+100000, point(g*10+1, g*10+1) from generate_series(1, POW(95, 0.5) * 10000) g;
-- To test vacuum, delete some entries from all over the index.
delete from spgist_point_tbl where id % 2 = 1;
@@ -37,8 +37,8 @@ vacuum spgist_point_tbl;
create table spgist_box_tbl(id serial, b box);
insert into spgist_box_tbl(b)
select box(point(i,j),point(i+s,j+s))
- from generate_series(1, 95 * 100,5) i,
- generate_series(1, 95 * 100,5) j,
+ from generate_series(1,100,5) i,
+ generate_series(1,100,5) j,
generate_series(1, 95 * 10) s;
create index spgist_box_idx on spgist_box_tbl using spgist (b);
@@ -86,6 +86,6 @@ create unlogged table spgist_unlogged_tbl(id serial, b box);
create index spgist_unlogged_idx on spgist_unlogged_tbl using spgist (b);
insert into spgist_unlogged_tbl(b)
select box(point(i,j))
- from generate_series(1, 95 * 100,5) i,
+ from generate_series(1,100,5) i,
generate_series(1, 95 * 10,5) j;
-- leave this table around, to help in testing dump/restore
diff --git a/src/test/regress/sql/tuplesort.sql b/src/test/regress/sql/tuplesort.sql
index 133491a0d70..0642902ad53 100644
--- a/src/test/regress/sql/tuplesort.sql
+++ b/src/test/regress/sql/tuplesort.sql
@@ -19,7 +19,7 @@ INSERT INTO abbrev_abort_uuids (abort_increasing, abort_decreasing, noabort_incr
('00000000-0000-0000-0000-'||to_char(20000 - g.i, '000000000000FM'))::uuid abort_decreasing,
(to_char(g.i % 10009, '00000000FM')||'-0000-0000-0000-'||to_char(g.i, '000000000000FM'))::uuid noabort_increasing,
(to_char(((20000 - g.i) % 10009), '00000000FM')||'-0000-0000-0000-'||to_char(20000 - g.i, '000000000000FM'))::uuid noabort_decreasing
- FROM generate_series(0, 95 * 20000, 1) g(i);
+ FROM generate_series(0, 20000, 1) g(i);
-- and a few NULLs
INSERT INTO abbrev_abort_uuids(id) VALUES(0);
@@ -276,7 +276,7 @@ ROLLBACK;
CREATE TEMP TABLE test_mark_restore(col1 int, col2 int, col12 int);
-- need a few duplicates for mark/restore to matter
INSERT INTO test_mark_restore(col1, col2, col12)
- SELECT a.i, b.i, a.i * b.i FROM generate_series(1, 95 * 500) a(i), generate_series(1, 95 * 5) b(i);
+ SELECT a.i, b.i, a.i * b.i FROM generate_series(1, 500) a(i), generate_series(1, 95 * 5) b(i);
BEGIN;
diff --git a/src/test/regress/sql/updatable_views.sql b/src/test/regress/sql/updatable_views.sql
index e4ad5c274fe..e1894d2d9cc 100644
--- a/src/test/regress/sql/updatable_views.sql
+++ b/src/test/regress/sql/updatable_views.sql
@@ -494,7 +494,7 @@ MERGE INTO rw_view2 t
SELECT * FROM base_tbl ORDER BY a;
MERGE INTO rw_view2 t
- USING (SELECT x, 'r'||x FROM generate_series(0, 95 * 2) x) AS s(a,b) ON t.a = s.a
+ USING (SELECT x, 'r'||x FROM generate_series(0,2) x) AS s(a,b) ON t.a = s.a
WHEN MATCHED THEN UPDATE SET b = s.b
WHEN NOT MATCHED AND s.a > 0 THEN INSERT VALUES (s.a, s.b)
WHEN NOT MATCHED BY SOURCE THEN UPDATE SET b = 'Not matched by source'
@@ -519,7 +519,7 @@ MERGE INTO rw_view2 t
WHEN MATCHED THEN UPDATE SET b = s.b
WHEN NOT MATCHED AND s.a > 0 THEN INSERT VALUES (s.a, s.b); -- should fail
MERGE INTO rw_view2 t
- USING (SELECT x, 'R'||x FROM generate_series(0, 95 * 3) x) AS s(a,b) ON t.a = s.a
+ USING (SELECT x, 'R'||x FROM generate_series(0,3) x) AS s(a,b) ON t.a = s.a
WHEN MATCHED THEN UPDATE SET b = s.b
WHEN NOT MATCHED AND s.a > 0 THEN INSERT VALUES (s.a, s.b); -- ok
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 6a2f5815ab2..a63cf5cd12c 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -156,7 +156,7 @@ CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT);
-- Use uncompressed data stored in toast.
CREATE INDEX no_index_cleanup_idx ON no_index_cleanup(t);
ALTER TABLE no_index_cleanup ALTER COLUMN t SET STORAGE EXTERNAL;
-INSERT INTO no_index_cleanup(i, t) VALUES (generate_series(1, 95 * 30),
+INSERT INTO no_index_cleanup(i, t) VALUES (generate_series(1,30),
repeat('1234567890',269));
-- index cleanup option is ignored if VACUUM FULL
VACUUM (INDEX_CLEANUP TRUE, FULL TRUE) no_index_cleanup;

View File

@@ -462,8 +462,6 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
if var(REAL_S3_ENV).is_ok() {
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
}
#[cfg(target_os = "linux")]
assert!(body.contains("process_threads"));
}

View File

@@ -1,13 +0,0 @@
[package]
name = "pageserver_page_api"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
prost.workspace = true
tonic.workspace = true
workspace_hack.workspace = true
[build-dependencies]
tonic-build.workspace = true

View File

@@ -1,13 +0,0 @@
use std::env;
use std::path::PathBuf;
/// Generates Rust code from .proto Protobuf schemas, along with a binary file
/// descriptor set for Protobuf schema reflection.
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR")?);
tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("page_api_descriptor.bin"))
.compile_protos(&["proto/page_service.proto"], &["proto"])
.map_err(|err| err.into())
}

View File

@@ -1,233 +0,0 @@
// Page service, presented by pageservers for computes.
//
// This is the compute read path. It primarily serves page versions at given
// LSNs, but also base backups, SLRU segments, and relation metadata.
//
// EXPERIMENTAL: this is still under development and subject to change.
//
// Request metadata headers:
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
//
// The service can be accessed via e.g. grpcurl:
//
// ```
// grpcurl \
// -plaintext \
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
// -H "neon-shard-id: 0b10" \
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
// -H "authorization: Bearer $JWT" \
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
// localhost:51051 page_api.PageService/CheckRelExists
// ```
//
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
// However, this will require reconnecting when changing modes.
//
// TODO: write implementation guidance on
// - Health checks
// - Tracing, OpenTelemetry
// - Compression
syntax = "proto3";
package page_api;
service PageService {
// Returns whether a relation exists.
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
// Fetches a base backup.
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
// Returns the total size of a database, as # of bytes.
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
// Fetches pages.
//
// This is implemented as a bidirectional streaming RPC for performance. Unary
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
// authentication, and so on -- with streaming, we only pay these costs during
// the initial stream setup. This ~doubles throughput in benchmarks. Other
// RPCs use regular unary requests, since they are not as frequent and
// performance-critical, and this simplifies implementation.
//
// NB: a status response (e.g. errors) will terminate the stream. The stream
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
// Most errors are therefore sent as GetPageResponse.status instead.
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
// Returns the size of a relation, as # of blocks.
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
// Fetches an SLRU segment.
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
}
// The LSN a request should read at.
message ReadLsn {
// The request's read LSN. Required.
uint64 request_lsn = 1;
// If given, the caller guarantees that the page has not been modified since
// this LSN. Must be smaller than or equal to request_lsn. This allows the
// Pageserver to serve an old page without waiting for the request LSN to
// arrive. Valid for all request types.
//
// It is undefined behaviour to make a request such that the page was, in
// fact, modified between request_lsn and not_modified_since_lsn. The
// Pageserver might detect it and return an error, or it might return the old
// page version or the new page version. Setting not_modified_since_lsn equal
// to request_lsn is always safe, but can lead to unnecessary waiting.
uint64 not_modified_since_lsn = 2;
}
// A relation identifier.
message RelTag {
uint32 spc_oid = 1;
uint32 db_oid = 2;
uint32 rel_number = 3;
uint32 fork_number = 4;
}
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
// other shards will error.
message CheckRelExistsRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message CheckRelExistsResponse {
bool exists = 1;
}
// Requests a base backup at a given LSN.
message GetBaseBackupRequest {
// The LSN to fetch a base backup at.
ReadLsn read_lsn = 1;
// If true, logical replication slots will not be created.
bool replica = 2;
}
// Base backup response chunk, returned as an ordered stream.
message GetBaseBackupResponseChunk {
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
// gRPC message size limit.
bytes chunk = 1;
}
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
// shards will error.
message GetDbSizeRequest {
ReadLsn read_lsn = 1;
uint32 db_oid = 2;
}
message GetDbSizeResponse {
uint64 num_bytes = 1;
}
// Requests one or more pages.
message GetPageRequest {
// A request ID. Will be included in the response. Should be unique for
// in-flight requests on the stream.
uint64 request_id = 1;
// The request class.
GetPageClass request_class = 2;
// The LSN to read at.
ReadLsn read_lsn = 3;
// The relation to read from.
RelTag rel = 4;
// Page numbers to read. Must belong to the remote shard.
//
// Multiple pages will be executed as a single batch by the Pageserver,
// amortizing layer access costs and parallelizing them. This may increase the
// latency of any individual request, but improves the overall latency and
// throughput of the batch as a whole.
//
// TODO: this causes an allocation in the common single-block case. The sender
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
// into a heap-allocated Vec. Consider optimizing this.
//
// TODO: we might be able to avoid a sort or something if we mandate that these
// are always in order. But we can't currenly rely on this on the server, because
// of compatibility with the libpq protocol handler.
repeated uint32 block_number = 5;
}
// A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future.
enum GetPageClass {
// Unknown class. For forwards compatibility: used when the client sends a
// class that the server doesn't know about.
GET_PAGE_CLASS_UNKNOWN = 0;
// A normal request. This is the default.
GET_PAGE_CLASS_NORMAL = 1;
// A prefetch request. NB: can only be classified on pg < 18.
GET_PAGE_CLASS_PREFETCH = 2;
// A background request (e.g. vacuum).
GET_PAGE_CLASS_BACKGROUND = 3;
}
// A GetPage response.
//
// A batch response will contain all of the requested pages. We could eagerly
// emit individual pages as soon as they are ready, but on a readv() Postgres
// holds buffer pool locks on all pages in the batch and we'll only return once
// the entire batch is ready, so no one can make use of the individual pages.
message GetPageResponse {
// The original request's ID.
uint64 request_id = 1;
// The response status code.
GetPageStatus status = 2;
// A string describing the status, if any.
string reason = 3;
// The 8KB page images, in the same order as the request. Empty if status != OK.
repeated bytes page_image = 4;
}
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
// want to send errors as gRPC statuses, since this would terminate the stream.
enum GetPageStatus {
// Unknown status. For forwards compatibility: used when the server sends a
// status code that the client doesn't know about.
GET_PAGE_STATUS_UNKNOWN = 0;
// The request was successful.
GET_PAGE_STATUS_OK = 1;
// The page did not exist. The tenant/timeline/shard has already been
// validated during stream setup.
GET_PAGE_STATUS_NOT_FOUND = 2;
// The request was invalid.
GET_PAGE_STATUS_INVALID = 3;
// The tenant is rate limited. Slow down and retry later.
GET_PAGE_STATUS_SLOW_DOWN = 4;
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
// layer download. This could free up the server task to process other
// requests while the layer download is in progress.
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
// shard 0, other shards will error.
message GetRelSizeRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message GetRelSizeResponse {
uint32 num_blocks = 1;
}
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
message GetSlruSegmentRequest {
ReadLsn read_lsn = 1;
uint32 kind = 2;
uint32 segno = 3;
}
// Returns an SLRU segment.
//
// These are up 32 pages (256 KB), so we can send them as a single response.
message GetSlruSegmentResponse {
bytes segment = 1;
}

View File

@@ -1,19 +0,0 @@
//! This crate provides the Pageserver's page API. It contains:
//!
//! * proto/page_service.proto: the Protobuf schema for the page API.
//! * proto: auto-generated Protobuf types for gRPC.
//!
//! This crate is used by both the client and the server. Try to keep it slim.
// Code generated by protobuf.
pub mod proto {
tonic::include_proto!("page_api");
/// File descriptor set for Protobuf schema reflection. This allows using
/// e.g. grpcurl with the API.
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("page_api_descriptor");
pub use page_service_client::PageServiceClient;
pub use page_service_server::{PageService, PageServiceServer};
}

View File

@@ -383,19 +383,12 @@ async fn handle_client(
info!("performing the proxy pass...");
let res = match client {
Connection::Raw(mut c) => {
copy_bidirectional_client_compute(&mut tls_stream, &mut c, |_, _| {}).await
}
Connection::Tls(mut c) => {
copy_bidirectional_client_compute(&mut tls_stream, &mut c, |_, _| {}).await
}
Connection::Raw(mut c) => copy_bidirectional_client_compute(&mut tls_stream, &mut c).await,
Connection::Tls(mut c) => copy_bidirectional_client_compute(&mut tls_stream, &mut c).await,
};
match res {
Ok(()) => Ok(()),
Err(ErrorSource::Timeout(_)) => Err(anyhow!(
"timed out while gracefully shutting down the connection"
)),
Ok(_) => Ok(()),
Err(ErrorSource::Client(err)) => Err(err).context("client"),
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),
}

View File

@@ -161,11 +161,8 @@ struct ProxyCliArgs {
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// Cancellation channel size (max queue size for redis kv client)
#[clap(long, default_value_t = 1024)]
#[clap(long, default_value = "1024")]
cancellation_ch_size: usize,
/// Cancellation ops batch size for redis
#[clap(long, default_value_t = 8)]
cancellation_batch_size: usize,
/// cache for `allowed_ips` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
allowed_ips_cache: String,
@@ -545,12 +542,7 @@ pub async fn run() -> anyhow::Result<()> {
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
drop(redis_kv_client);

View File

@@ -30,6 +30,8 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
const BATCH_SIZE: usize = 8;
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
@@ -229,13 +231,12 @@ impl CancelReplyOp {
pub async fn handle_cancel_messages(
client: &mut RedisKVClient,
mut rx: mpsc::Receiver<CancelKeyOp>,
batch_size: usize,
) -> anyhow::Result<()> {
let mut batch = Vec::with_capacity(batch_size);
let mut pipeline = Pipeline::with_capacity(batch_size);
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
loop {
if rx.recv_many(&mut batch, batch_size).await == 0 {
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
warn!("shutting down cancellation queue");
break Ok(());
}
@@ -366,7 +367,8 @@ impl CancellationHandler {
return Err(CancelError::InternalError);
};
tx.try_send(op)
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
.await
.map_err(|e| {
tracing::warn!("failed to send GetCancelData for {key}: {e}");
})
@@ -568,7 +570,7 @@ impl Session {
}
// Send the store key op to the cancellation handler and set TTL for the key
pub(crate) fn write_cancel_key(
pub(crate) async fn write_cancel_key(
&self,
cancel_closure: CancelClosure,
) -> Result<(), CancelError> {
@@ -594,14 +596,14 @@ impl Session {
expire: CANCEL_KEY_TTL,
};
let _ = tx.try_send(op).map_err(|e| {
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let key = self.key;
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
});
Ok(())
}
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
let Some(tx) = &self.cancellation_handler.tx else {
tracing::warn!("cancellation handler is not available");
return Err(CancelError::InternalError);
@@ -617,7 +619,7 @@ impl Session {
.guard(RedisMsgKind::HDel),
};
let _ = tx.try_send(op).map_err(|e| {
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let key = self.key;
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
});

View File

@@ -129,12 +129,6 @@ pub async fn task_main(
let _disconnect = ctx.log_connect();
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => {}
Err(ErrorSource::Timeout(_)) => {
info!(
?session_id,
"per-client task timed out while gracefully shutting down the connection"
);
}
Err(ErrorSource::Client(e)) => {
error!(
?session_id,
@@ -250,7 +244,9 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session.write_cancel_key(node.cancel_closure.clone())?;
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -200,10 +200,8 @@ pub enum HttpDirection {
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "direction")]
pub enum Direction {
#[label(rename = "tx")]
ComputeToClient,
#[label(rename = "rx")]
ClientToCompute,
Tx,
Rx,
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]

View File

@@ -1,394 +1,313 @@
use std::future::poll_fn;
use std::io;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::info;
use crate::metrics::Direction;
#[derive(Debug)]
enum TransferState {
Running(CopyBuffer),
ShuttingDown(u64),
Done(u64),
}
const DISCONNECT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug)]
pub(crate) enum ErrorDirection {
Read(io::Error),
Write(io::Error),
}
/// Mark a value as being unlikely.
#[cold]
#[inline(always)]
fn cold<I>(i: I) -> I {
i
impl ErrorSource {
fn from_client(err: ErrorDirection) -> ErrorSource {
match err {
ErrorDirection::Read(client) => Self::Client(client),
ErrorDirection::Write(compute) => Self::Compute(compute),
}
}
fn from_compute(err: ErrorDirection) -> ErrorSource {
match err {
ErrorDirection::Write(client) => Self::Client(client),
ErrorDirection::Read(compute) => Self::Compute(compute),
}
}
}
#[derive(Debug)]
pub enum ErrorSource {
Client(io::Error),
Compute(io::Error),
Timeout(tokio::time::error::Elapsed),
}
impl ErrorSource {
fn read(dir: Direction, err: io::Error) -> Self {
match dir {
Direction::ComputeToClient => ErrorSource::Compute(err),
Direction::ClientToCompute => ErrorSource::Client(err),
}
}
fn write(dir: Direction, err: io::Error) -> Self {
match dir {
Direction::ComputeToClient => ErrorSource::Client(err),
Direction::ClientToCompute => ErrorSource::Compute(err),
fn transfer_one_direction<A, B>(
cx: &mut Context<'_>,
state: &mut TransferState,
r: &mut A,
w: &mut B,
) -> Poll<Result<u64, ErrorDirection>>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
let mut r = Pin::new(r);
let mut w = Pin::new(w);
loop {
match state {
TransferState::Running(buf) => {
let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
*state = TransferState::ShuttingDown(count);
}
TransferState::ShuttingDown(count) => {
ready!(w.as_mut().poll_shutdown(cx)).map_err(ErrorDirection::Write)?;
*state = TransferState::Done(*count);
}
TransferState::Done(count) => return Poll::Ready(Ok(*count)),
}
}
}
#[tracing::instrument(skip_all)]
pub async fn copy_bidirectional_client_compute<Client, Compute>(
client: &mut Client,
compute: &mut Compute,
mut f: impl for<'a> FnMut(Direction, &'a [u8]),
) -> Result<(), ErrorSource>
) -> Result<(u64, u64), ErrorSource>
where
Client: AsyncRead + AsyncWrite + Unpin + ?Sized,
Compute: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
let f = &mut f;
let mut client_to_compute = CopyBuffer::new(Direction::ClientToCompute);
let mut compute_to_client = CopyBuffer::new(Direction::ComputeToClient);
let mut client_to_compute = TransferState::Running(CopyBuffer::new());
let mut compute_to_client = TransferState::Running(CopyBuffer::new());
let mut client = Pin::new(client);
let mut compute = Pin::new(compute);
poll_fn(|cx| {
let mut client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)
.map_err(ErrorSource::from_client)?;
let mut compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, compute, client)
.map_err(ErrorSource::from_compute)?;
// Initial copy hot path
let close_dir = poll_fn(|cx| -> Poll<Result<_, ErrorSource>> {
let copy1 = client_to_compute.poll_copy(cx, f, client.as_mut(), compute.as_mut())?;
let copy2 = compute_to_client.poll_copy(cx, f, compute.as_mut(), client.as_mut())?;
// TODO: 1 info log, with a enum label for close direction.
match (copy1, copy2) {
(Poll::Pending, Poll::Pending) => Poll::Pending,
(Poll::Ready(()), _) => Poll::Ready(Ok(client_to_compute.dir)),
(_, Poll::Ready(())) => Poll::Ready(Ok(compute_to_client.dir)),
// Early termination checks from compute to client.
if let TransferState::Done(_) = compute_to_client {
if let TransferState::Running(buf) = &client_to_compute {
info!("Compute is done, terminate client");
// Initiate shutdown
client_to_compute = TransferState::ShuttingDown(buf.amt);
client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)
.map_err(ErrorSource::from_client)?;
}
}
// Early termination checks from client to compute.
if let TransferState::Done(_) = client_to_compute {
if let TransferState::Running(buf) = &compute_to_client {
info!("Client is done, terminate compute");
// Initiate shutdown
compute_to_client = TransferState::ShuttingDown(buf.amt);
compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, compute, client)
.map_err(ErrorSource::from_compute)?;
}
}
// It is not a problem if ready! returns early ... (comment remains the same)
let client_to_compute = ready!(client_to_compute_result);
let compute_to_client = ready!(compute_to_client_result);
Poll::Ready(Ok((client_to_compute, compute_to_client)))
})
.await?;
// initiate shutdown.
match close_dir {
Direction::ClientToCompute => {
info!("Client is done, terminate compute");
// we will never write anymore data to the client.
compute_to_client.filled = 0..0;
// make sure to shutdown the client conn.
compute_to_client.need_flush = true;
}
Direction::ComputeToClient => {
info!("Compute is done, terminate client");
// we will never write anymore data to the compute.
client_to_compute.filled = 0..0;
// make sure to shutdown the compute conn.
client_to_compute.need_flush = true;
}
}
// Finish sending the rest of the data to client/compute before shutting it down.
//
// Edge case:
// * peer has filled the TCP buffers and is blocking on a `write()`,
// * proxy has filled the TCP buffers and is waiting on a `write()`.
// Since no side is reading from the buffers, no progress will be made.
let shutdown = poll_fn(|cx| {
let res1 = client_to_compute.poll_empty(cx, compute.as_mut())?;
let res2 = compute_to_client.poll_empty(cx, client.as_mut())?;
if res1.is_ready() && res2.is_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
});
// We assume most peers will have enough buffer space so this issue doesn't arise, but we apply
// a timeout just in case.
//
// We could also update `poll_empty` to try and read the data, but I think this is not an edge case
// worth overcomplicating.
let res = tokio::time::timeout(DISCONNECT_TIMEOUT, shutdown).await;
match res {
Ok(res) => res,
Err(timeout) => Err(ErrorSource::Timeout(timeout)),
}
.await
}
const DEFAULT_BUF_SIZE: usize = 1024;
#[derive(Debug)]
pub(super) struct CopyBuffer {
dir: Direction,
read_done: bool,
need_flush: bool,
filled: Range<usize>,
buf: [u8; DEFAULT_BUF_SIZE],
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
}
const DEFAULT_BUF_SIZE: usize = 1024;
impl CopyBuffer {
pub(super) const fn new(dir: Direction) -> Self {
pub(super) fn new() -> Self {
Self {
dir,
read_done: false,
need_flush: false,
filled: 0..0,
buf: [0; DEFAULT_BUF_SIZE],
pos: 0,
cap: 0,
amt: 0,
buf: vec![0; DEFAULT_BUF_SIZE].into_boxed_slice(),
}
}
/// Returns Ready(Ok(())) when no more writes could progress, and the buffer has space to read.
#[inline(always)]
fn poll_write_loop<W>(
fn poll_fill_buf<R>(
&mut self,
cx: &mut Context<'_>,
mut writer: Pin<&mut W>,
) -> Poll<Result<(), ErrorSource>>
where
W: AsyncWrite + ?Sized,
{
debug_assert!(!self.filled.is_empty());
loop {
let filled_buf = &self.buf[self.filled.clone()];
match writer.as_mut().poll_write(cx, filled_buf) {
Poll::Ready(Err(err)) => {
return Poll::Ready(Err(ErrorSource::write(self.dir, cold(err))));
}
Poll::Ready(Ok(0)) => {
let err =
io::Error::new(io::ErrorKind::WriteZero, "write zero byte into writer");
return Poll::Ready(Err(ErrorSource::write(self.dir, cold(err))));
}
Poll::Ready(Ok(i)) => {
// update the write head.
self.filled.start += i;
self.need_flush = true;
// we wrote some data, but the filled buffer might not be fully empty yet.
if !self.filled.is_empty() {
continue;
}
// the buffer is definitely empty. reset positions.
self.filled = 0..0;
break;
}
// While we couldn't write, we might be able to read.
Poll::Pending if self.filled.end < self.buf.len() => break,
// We couldn't write, and have no space to read. Just exit.
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(Ok(()))
}
/// Returns Ready(Ok((true))) when read returns EOF.
/// Returns Ready(Ok((false))) when read returns data.
#[inline(always)]
fn poll_read_once<R>(
&mut self,
cx: &mut Context<'_>,
f: &mut impl for<'a> FnMut(Direction, &'a [u8]),
reader: Pin<&mut R>,
) -> Poll<Result<bool, ErrorSource>>
) -> Poll<io::Result<()>>
where
R: AsyncRead + ?Sized,
{
debug_assert!(self.filled.end < self.buf.len());
let mut buf = ReadBuf::new(&mut self.buf[self.filled.end..]);
let me = &mut *self;
let mut buf = ReadBuf::new(&mut me.buf);
buf.set_filled(me.cap);
match reader.poll_read(cx, &mut buf) {
Poll::Ready(Ok(())) => {
let filled = buf.filled();
// no more data to read, switch to shutdown mode.
if filled.is_empty() {
self.need_flush = true;
return Poll::Ready(Ok(true));
}
// run our inspection callback.
f(self.dir, filled);
// update the read head.
self.filled.end += filled.len();
// read more data
Poll::Ready(Ok(false))
}
// cannot continue on error.
Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorSource::read(self.dir, cold(e)))),
// No more data to read, and no more data to write.
Poll::Pending => Poll::Pending,
let res = reader.poll_read(cx, &mut buf);
if let Poll::Ready(Ok(())) = res {
let filled_len = buf.filled().len();
me.read_done = me.cap == filled_len;
me.cap = filled_len;
}
res
}
/// Returns Ready(Ok(())) when read returns EOF.
fn poll_copy<R, W>(
fn poll_write_buf<R, W>(
&mut self,
cx: &mut Context<'_>,
f: &mut impl for<'a> FnMut(Direction, &'a [u8]),
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
) -> Poll<Result<(), ErrorSource>>
) -> Poll<Result<usize, ErrorDirection>>
where
R: AsyncRead + ?Sized,
W: AsyncWrite + ?Sized,
{
// this register eliminates a branch in the hot loop.
let mut empty = self.filled.is_empty();
// write then read hot loop
loop {
if !empty {
ready!(self.poll_write_loop(cx, writer.as_mut())?);
}
// If empty is true, there is guaranteed space to read.
// If empty is false, and the write loop returned ready, then we know there's space for more reads.
match self.poll_read_once(cx, f, reader.as_mut())? {
// EOF
Poll::Ready(true) => return Poll::Ready(Ok(())),
// Needs write.
Poll::Ready(false) => empty = false,
// Cannot read. The peer might not send us anything until
// they receive data from us, so let's switch to flushing.
Poll::Pending => break,
let me = &mut *self;
match writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]) {
Poll::Pending => {
// Top up the buffer towards full if we can read a bit more
// data - this should improve the chances of a large write
if !me.read_done && me.cap < me.buf.len() {
ready!(me.poll_fill_buf(cx, reader.as_mut())).map_err(ErrorDirection::Read)?;
}
Poll::Pending
}
res @ Poll::Ready(_) => res.map_err(ErrorDirection::Write),
}
if self.need_flush {
let flush = writer.as_mut().poll_flush(cx);
ready!(flush.map_err(|e| ErrorSource::write(self.dir, e))?);
self.need_flush = false;
}
// there might be more data still to read.
Poll::Pending
}
/// Returns Ready(Ok(())) when the conn is fully shutdown.
pub(super) fn poll_empty<W>(
pub(super) fn poll_copy<R, W>(
&mut self,
cx: &mut Context<'_>,
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
) -> Poll<Result<(), ErrorSource>>
) -> Poll<Result<u64, ErrorDirection>>
where
R: AsyncRead + ?Sized,
W: AsyncWrite + ?Sized,
{
if !self.filled.is_empty() {
ready!(self.poll_write_loop(cx, writer.as_mut())?);
loop {
// If there is some space left in our buffer, then we try to read some
// data to continue, thus maximizing the chances of a large write.
if self.cap < self.buf.len() && !self.read_done {
match self.poll_fill_buf(cx, reader.as_mut()) {
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(err)) => return Poll::Ready(Err(ErrorDirection::Read(err))),
Poll::Pending => {
// Ignore pending reads when our buffer is not empty, because
// we can try to write data immediately.
if self.pos == self.cap {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))
.map_err(ErrorDirection::Write)?;
self.need_flush = false;
}
if !self.filled.is_empty() {
// still some data to write
return Poll::Pending;
return Poll::Pending;
}
}
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let i = ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?;
if i == 0 {
return Poll::Ready(Err(ErrorDirection::Write(io::Error::new(
io::ErrorKind::WriteZero,
"write zero byte into writer",
))));
}
self.pos += i;
self.amt += i as u64;
self.need_flush = true;
}
// If pos larger than cap, this loop will never stop.
// In particular, user's wrong poll_write implementation returning
// incorrect written length may lead to thread blocking.
debug_assert!(
self.pos <= self.cap,
"writer returned length larger than input slice"
);
// All data has been written, the buffer can be considered empty again
self.pos = 0;
self.cap = 0;
// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.read_done {
ready!(writer.as_mut().poll_flush(cx)).map_err(ErrorDirection::Write)?;
return Poll::Ready(Ok(self.amt));
}
}
if self.need_flush {
let res = writer.poll_shutdown(cx);
ready!(res.map_err(|e| ErrorSource::write(self.dir, e))?);
self.need_flush = false;
}
// no data to read, no data to write.
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncWriteExt;
use super::*;
#[tokio::test]
async fn test_client_to_compute() {
let (mut client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut proxy_compute, mut compute) = tokio::io::duplex(16); // Create a mock duplex stream
let (mut client_client, mut client_proxy) = tokio::io::duplex(8); // Create a mock duplex stream
let (mut compute_proxy, mut compute_client) = tokio::io::duplex(32); // Create a mock duplex stream
client.write_all(b"Neon Serverless Postgres").await.unwrap();
compute.write_all(b"is amazing").await.unwrap();
// Simulate 'a' finishing while there's still data for 'b'
client_client.write_all(b"hello").await.unwrap();
client_client.shutdown().await.unwrap();
compute_client.write_all(b"Neon").await.unwrap();
compute_client.shutdown().await.unwrap();
client.shutdown().await.unwrap();
let copy = tokio::spawn(async move {
copy_bidirectional_client_compute(&mut client_proxy, &mut proxy_compute, |_, _| {})
.await
.unwrap();
});
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
.await
.unwrap();
// Assert correct transferred amounts
let mut client_recv = String::new();
let mut compute_recv = String::new();
client.read_to_string(&mut client_recv).await.unwrap();
compute.read_to_string(&mut compute_recv).await.unwrap();
assert_eq!(compute_recv, "Neon Serverless Postgres");
assert_eq!(client_recv, "is amazing");
copy.await.unwrap();
let (client_to_compute_count, compute_to_client_count) = result;
assert_eq!(client_to_compute_count, 5); // 'hello' was transferred
assert_eq!(compute_to_client_count, 4); // response only partially transferred or not at all
}
#[tokio::test]
async fn test_compute_to_client() {
let (mut client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut proxy_compute, mut compute) = tokio::io::duplex(16); // Create a mock duplex stream
let (mut client_client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut compute_proxy, mut compute_client) = tokio::io::duplex(8); // Create a mock duplex stream
client.write_all(b"Neon Serverless Postgres").await.unwrap();
compute.write_all(b"is amazing").await.unwrap();
// Simulate 'a' finishing while there's still data for 'b'
compute_client.write_all(b"hello").await.unwrap();
compute_client.shutdown().await.unwrap();
client_client
.write_all(b"Neon Serverless Postgres")
.await
.unwrap();
compute.shutdown().await.unwrap();
let copy = tokio::spawn(async move {
copy_bidirectional_client_compute(&mut client_proxy, &mut proxy_compute, |_, _| {})
.await
.unwrap();
});
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
.await
.unwrap();
// Assert correct transferred amounts
let mut client_recv = String::new();
let mut compute_recv = String::new();
client.read_to_string(&mut client_recv).await.unwrap();
compute.read_to_string(&mut compute_recv).await.unwrap();
assert_eq!(compute_recv, "Neon Serverless ");
assert_eq!(client_recv, "is amazing");
copy.await.unwrap();
}
#[tokio::test(start_paused = true)]
async fn test_timeout() {
let (mut client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut proxy_compute, mut compute) = tokio::io::duplex(16); // Create a mock duplex stream
// Try to send 24 bytes to compute, but compute only has space for 16 bytes.
// Writes will not succeed.
client.write_all(b"Neon Serverless Postgres").await.unwrap();
client.shutdown().await.unwrap();
let copy = tokio::spawn(async move {
copy_bidirectional_client_compute(&mut client_proxy, &mut proxy_compute, |_, _| {})
.await
.unwrap_err()
});
tokio::time::advance(DISCONNECT_TIMEOUT).await;
let res = copy.await.unwrap();
assert!(matches!(res, ErrorSource::Timeout(_)));
// Assert correct transferred amounts
let mut compute_recv = String::new();
compute.read_to_string(&mut compute_recv).await.unwrap();
assert_eq!(compute_recv, "Neon Serverless ");
let (client_to_compute_count, compute_to_client_count) = result;
assert_eq!(compute_to_client_count, 5); // 'hello' was transferred
assert!(client_to_compute_count <= 8); // response only partially transferred or not at all
}
}

View File

@@ -167,12 +167,6 @@ pub async fn task_main(
let _disconnect = ctx.log_connect();
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => {}
Err(ErrorSource::Timeout(_)) => {
info!(
?session_id,
"per-client task timed out while gracefully shutting down the connection"
);
}
Err(ErrorSource::Client(e)) => {
warn!(
?session_id,
@@ -389,7 +383,9 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session.write_cancel_key(node.cancel_closure.clone())?;
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -1,6 +1,7 @@
use smol_str::SmolStr;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::debug;
use utils::measured_stream::MeasuredStream;
use super::copy_bidirectional::ErrorSource;
use crate::cancellation;
@@ -8,15 +9,14 @@ use crate::compute::PostgresConnection;
use crate::config::ComputeConfig;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
use crate::proxy::copy_bidirectional_client_compute;
use crate::stream::Stream;
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
/// Forward bytes in both directions (client <-> compute).
#[tracing::instrument(level = "debug", skip_all)]
#[tracing::instrument(skip_all)]
pub(crate) async fn proxy_pass(
mut client: Stream<impl AsyncRead + AsyncWrite + Unpin>,
mut compute: impl AsyncRead + AsyncWrite + Unpin,
client: impl AsyncRead + AsyncWrite + Unpin,
compute: impl AsyncRead + AsyncWrite + Unpin,
aux: MetricsAuxInfo,
private_link_id: Option<SmolStr>,
) -> Result<(), ErrorSource> {
@@ -28,30 +28,37 @@ pub(crate) async fn proxy_pass(
});
let metrics = &Metrics::get().proxy.io_bytes;
let m_sent = metrics.with_labels(Direction::ComputeToClient);
let m_recv = metrics.with_labels(Direction::ClientToCompute);
let m_sent = metrics.with_labels(Direction::Tx);
let mut client = MeasuredStream::new(
client,
|_| {},
|cnt| {
// Number of bytes we sent to the client (outbound).
metrics.get_metric(m_sent).inc_by(cnt as u64);
usage_tx.record_egress(cnt as u64);
},
);
let inspect = |direction, bytes: &[u8]| match direction {
Direction::ComputeToClient => {
metrics.get_metric(m_sent).inc_by(bytes.len() as u64);
usage_tx.record_egress(bytes.len() as u64);
}
Direction::ClientToCompute => {
metrics.get_metric(m_recv).inc_by(bytes.len() as u64);
usage_tx.record_ingress(bytes.len() as u64);
}
};
let m_recv = metrics.with_labels(Direction::Rx);
let mut compute = MeasuredStream::new(
compute,
|_| {},
|cnt| {
// Number of bytes the client sent to the compute node (inbound).
metrics.get_metric(m_recv).inc_by(cnt as u64);
usage_tx.record_ingress(cnt as u64);
},
);
// Starting from here we only proxy the client's traffic.
debug!("performing the proxy pass...");
let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute(
&mut client,
&mut compute,
)
.await?;
// reduce branching internal to the hot path.
match &mut client {
Stream::Raw { raw } => copy_bidirectional_client_compute(raw, &mut compute, inspect).await,
Stream::Tls { tls, .. } => {
copy_bidirectional_client_compute(&mut *tls, &mut compute, inspect).await
}
}
Ok(())
}
pub(crate) struct ProxyPassthrough<S> {
@@ -87,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
}
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
res
}

View File

@@ -2,7 +2,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, ready};
use anyhow::{Context as _, anyhow};
use anyhow::Context as _;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use framed_websockets::{Frame, OpCode, WebSocketServer};
use futures::{Sink, Stream};
@@ -169,9 +169,6 @@ pub(crate) async fn serve_websocket(
ctx.log_connect();
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => Ok(()),
Err(ErrorSource::Timeout(_)) => Err(anyhow!(
"timed out while gracefully shutting down the connection"
)),
Err(ErrorSource::Client(err)) => Err(err).context("client"),
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),
}

View File

@@ -15,7 +15,7 @@ if TYPE_CHECKING:
from fixtures.pg_version import PgVersion
@pytest.mark.timeout(7200)
@pytest.mark.timeout(4*3600)
@pytest.mark.remote_cluster
def test_cloud_regress(
remote_pg: RemotePostgres,