mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 08:00:37 +00:00
Compare commits
12 Commits
conrad/rem
...
amasteerov
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ec150b7d2 | ||
|
|
20de8c2d52 | ||
|
|
27ba297487 | ||
|
|
01b5fc902f | ||
|
|
73aa5ede11 | ||
|
|
ad0b4c6d01 | ||
|
|
975a5c22c8 | ||
|
|
f7abc25c3e | ||
|
|
2b2df45e76 | ||
|
|
a3d5ed9d2f | ||
|
|
f02f19a1d0 | ||
|
|
a9f7a96cb7 |
11
.github/workflows/cloud-regress.yml
vendored
11
.github/workflows/cloud-regress.yml
vendored
@@ -14,11 +14,6 @@ defaults:
|
||||
run:
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow
|
||||
group: ${{ github.workflow }}
|
||||
cancel-in-progress: true
|
||||
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
@@ -33,9 +28,10 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
pg-version: [16, 17]
|
||||
pg-version: [17]
|
||||
|
||||
runs-on: us-east-2
|
||||
#runs-on: us-east-2
|
||||
runs-on: small
|
||||
container:
|
||||
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
@@ -59,6 +55,7 @@ jobs:
|
||||
run: |
|
||||
cd "vendor/postgres-v${PG_VERSION}"
|
||||
patch -p1 < "../../compute/patches/cloud_regress_pg${PG_VERSION}.patch"
|
||||
patch -p1 < "../../compute/patches/cloud_regress_pg17_495.patch"
|
||||
|
||||
- name: Generate a random password
|
||||
id: pwgen
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -4434,16 +4434,6 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"prost 0.13.3",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.2.1"
|
||||
|
||||
@@ -9,7 +9,6 @@ members = [
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/pagebench",
|
||||
"pageserver/page_api",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"safekeeper/client",
|
||||
@@ -253,7 +252,6 @@ pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
|
||||
pageserver_page_api = { path = "./pageserver/page_api" }
|
||||
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
|
||||
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
|
||||
4486
compute/patches/cloud_regress_pg17_395.patch
Normal file
4486
compute/patches/cloud_regress_pg17_395.patch
Normal file
File diff suppressed because it is too large
Load Diff
4486
compute/patches/cloud_regress_pg17_495.patch
Normal file
4486
compute/patches/cloud_regress_pg17_495.patch
Normal file
File diff suppressed because it is too large
Load Diff
4790
compute/patches/cloud_regress_pg17_ha.patch
Normal file
4790
compute/patches/cloud_regress_pg17_ha.patch
Normal file
File diff suppressed because it is too large
Load Diff
129
compute/patches/cloud_regress_pg17_ha_plus.patch
Normal file
129
compute/patches/cloud_regress_pg17_ha_plus.patch
Normal file
@@ -0,0 +1,129 @@
|
||||
diff --git a/src/test/regress/sql/box.sql b/src/test/regress/sql/box.sql
|
||||
index 249636c76c3..540c2b54dda 100644
|
||||
--- a/src/test/regress/sql/box.sql
|
||||
+++ b/src/test/regress/sql/box.sql
|
||||
@@ -196,7 +196,7 @@ CREATE TABLE quad_box_tbl (id int, b box);
|
||||
|
||||
INSERT INTO quad_box_tbl
|
||||
SELECT (x - 1) * 100 + y, box(point(x * 10, y * 10), point(x * 10 + 5, y * 10 + 5))
|
||||
- FROM generate_series(1, 95 * 100) x,
|
||||
+ FROM generate_series(1, 100) x,
|
||||
generate_series(1, 95 * 100) y;
|
||||
|
||||
-- insert repeating data to test allTheSame
|
||||
diff --git a/src/test/regress/sql/partition_join.sql b/src/test/regress/sql/partition_join.sql
|
||||
index 3ca8a2d6090..a8e40f906c4 100644
|
||||
--- a/src/test/regress/sql/partition_join.sql
|
||||
+++ b/src/test/regress/sql/partition_join.sql
|
||||
@@ -533,7 +533,7 @@ create temp table prtx2_3 partition of prtx2 for values from (21) to (31);
|
||||
insert into prtx1 select 1 + i%30, i, i
|
||||
from generate_series(1, 95 * 1000) i;
|
||||
insert into prtx2 select 1 + i%30, i, i
|
||||
- from generate_series(1, 95 * 500) i, generate_series(1, 95 * 10) j;
|
||||
+ from generate_series(1, 500) i, generate_series(1, 95 * 10) j;
|
||||
create index on prtx2 (b);
|
||||
create index on prtx2 (c);
|
||||
analyze prtx1;
|
||||
diff --git a/src/test/regress/sql/partition_prune.sql b/src/test/regress/sql/partition_prune.sql
|
||||
index 82ac39d5dc8..bef0a891ade 100644
|
||||
--- a/src/test/regress/sql/partition_prune.sql
|
||||
+++ b/src/test/regress/sql/partition_prune.sql
|
||||
@@ -1274,9 +1274,9 @@ select
|
||||
case c when 0 then null else 3 end,
|
||||
case d when 0 then null else 4 end
|
||||
from
|
||||
- generate_series(0, 95 * 1) a,
|
||||
- generate_series(0, 95 * 1) b,
|
||||
- generate_series(0, 95 * 1) c,
|
||||
+ generate_series(0, 1) a,
|
||||
+ generate_series(0, 1) b,
|
||||
+ generate_series(0, 1) c,
|
||||
generate_series(0, 95 * 1) d;
|
||||
|
||||
-- Ensure partition pruning works correctly for each combination of IS NULL
|
||||
diff --git a/src/test/regress/sql/polygon.sql b/src/test/regress/sql/polygon.sql
|
||||
index d39a2b4e8f8..2d862985510 100644
|
||||
--- a/src/test/regress/sql/polygon.sql
|
||||
+++ b/src/test/regress/sql/polygon.sql
|
||||
@@ -42,7 +42,7 @@ CREATE TABLE quad_poly_tbl (id int, p polygon);
|
||||
|
||||
INSERT INTO quad_poly_tbl
|
||||
SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10))
|
||||
- FROM generate_series(1, 95 * 100) x,
|
||||
+ FROM generate_series(1, 100) x,
|
||||
generate_series(1, 95 * 100) y;
|
||||
|
||||
INSERT INTO quad_poly_tbl
|
||||
diff --git a/src/test/regress/sql/rangetypes.sql b/src/test/regress/sql/rangetypes.sql
|
||||
index b51d6c405c2..4138418c7a6 100644
|
||||
--- a/src/test/regress/sql/rangetypes.sql
|
||||
+++ b/src/test/regress/sql/rangetypes.sql
|
||||
@@ -314,13 +314,13 @@ select count(*) from test_range_gist where ir -|- int4multirange(int4range(100,2
|
||||
create table test_range_spgist(ir int4range);
|
||||
create index test_range_spgist_idx on test_range_spgist using spgist (ir);
|
||||
|
||||
-insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 95 * 2000) g;
|
||||
-insert into test_range_spgist select 'empty'::int4range from generate_series(1, 95 * 500) g;
|
||||
-insert into test_range_spgist select int4range(g, g+10000) from generate_series(1, 95 * 1000) g;
|
||||
-insert into test_range_spgist select 'empty'::int4range from generate_series(1, 95 * 500) g;
|
||||
-insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1, 95 * 100) g;
|
||||
-insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1, 95 * 100) g;
|
||||
-insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 95 * 2000) g;
|
||||
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 0.1 * 95 * 2000) g;
|
||||
+insert into test_range_spgist select 'empty'::int4range from generate_series(1, 0.1 * 95 * 500) g;
|
||||
+insert into test_range_spgist select int4range(g, g+10000) from generate_series(1, 0.1 * 95 * 1000) g;
|
||||
+insert into test_range_spgist select 'empty'::int4range from generate_series(1, 0.1 * 95 * 500) g;
|
||||
+insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1, 0.1 * 95 * 100) g;
|
||||
+insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1, 0.1 * 95 * 100) g;
|
||||
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 0.1 * 95 * 2000) g;
|
||||
|
||||
-- first, verify non-indexed results
|
||||
SET enable_seqscan = t;
|
||||
diff --git a/src/test/regress/sql/spgist.sql b/src/test/regress/sql/spgist.sql
|
||||
index 0c4f24e1d49..61e53375539 100644
|
||||
--- a/src/test/regress/sql/spgist.sql
|
||||
+++ b/src/test/regress/sql/spgist.sql
|
||||
@@ -16,9 +16,9 @@ vacuum spgist_point_tbl;
|
||||
|
||||
-- Insert more data, to make the index a few levels deep.
|
||||
insert into spgist_point_tbl (id, p)
|
||||
-select g, point(g*10, g*10) from generate_series(1, 95 * 10000) g;
|
||||
+select g, point(g*10, g*10) from generate_series(1, 0.1 * 95 * 10000) g;
|
||||
insert into spgist_point_tbl (id, p)
|
||||
-select g+100000, point(g*10+1, g*10+1) from generate_series(1, 95 * 10000) g;
|
||||
+select g+100000, point(g*10+1, g*10+1) from generate_series(1, 0.1 * 95 * 10000) g;
|
||||
|
||||
-- To test vacuum, delete some entries from all over the index.
|
||||
delete from spgist_point_tbl where id % 2 = 1;
|
||||
@@ -37,8 +37,8 @@ vacuum spgist_point_tbl;
|
||||
create table spgist_box_tbl(id serial, b box);
|
||||
insert into spgist_box_tbl(b)
|
||||
select box(point(i,j),point(i+s,j+s))
|
||||
- from generate_series(1, 95 * 100,5) i,
|
||||
- generate_series(1, 95 * 100,5) j,
|
||||
+ from generate_series(1,100,5) i,
|
||||
+ generate_series(1,100,5) j,
|
||||
generate_series(1, 95 * 10) s;
|
||||
create index spgist_box_idx on spgist_box_tbl using spgist (b);
|
||||
|
||||
@@ -86,6 +86,6 @@ create unlogged table spgist_unlogged_tbl(id serial, b box);
|
||||
create index spgist_unlogged_idx on spgist_unlogged_tbl using spgist (b);
|
||||
insert into spgist_unlogged_tbl(b)
|
||||
select box(point(i,j))
|
||||
- from generate_series(1, 95 * 100,5) i,
|
||||
+ from generate_series(1,100,5) i,
|
||||
generate_series(1, 95 * 10,5) j;
|
||||
-- leave this table around, to help in testing dump/restore
|
||||
diff --git a/src/test/regress/sql/tuplesort.sql b/src/test/regress/sql/tuplesort.sql
|
||||
index fa762f26ac7..7a1fd619eba 100644
|
||||
--- a/src/test/regress/sql/tuplesort.sql
|
||||
+++ b/src/test/regress/sql/tuplesort.sql
|
||||
@@ -276,7 +276,7 @@ ROLLBACK;
|
||||
CREATE TEMP TABLE test_mark_restore(col1 int, col2 int, col12 int);
|
||||
-- need a few duplicates for mark/restore to matter
|
||||
INSERT INTO test_mark_restore(col1, col2, col12)
|
||||
- SELECT a.i, b.i, a.i * b.i FROM generate_series(1, 95 * 500) a(i), generate_series(1, 95 * 5) b(i);
|
||||
+ SELECT a.i, b.i, a.i * b.i FROM generate_series(1, 500) a(i), generate_series(1, 95 * 5) b(i);
|
||||
|
||||
BEGIN;
|
||||
|
||||
593
compute/patches/cloud_regress_pg17_ha_plus2.patch
Normal file
593
compute/patches/cloud_regress_pg17_ha_plus2.patch
Normal file
@@ -0,0 +1,593 @@
|
||||
diff --git a/src/test/regress/sql/box.sql b/src/test/regress/sql/box.sql
|
||||
index 249636c76c3..540c2b54dda 100644
|
||||
--- a/src/test/regress/sql/box.sql
|
||||
+++ b/src/test/regress/sql/box.sql
|
||||
@@ -196,7 +196,7 @@ CREATE TABLE quad_box_tbl (id int, b box);
|
||||
|
||||
INSERT INTO quad_box_tbl
|
||||
SELECT (x - 1) * 100 + y, box(point(x * 10, y * 10), point(x * 10 + 5, y * 10 + 5))
|
||||
- FROM generate_series(1, 95 * 100) x,
|
||||
+ FROM generate_series(1, 100) x,
|
||||
generate_series(1, 95 * 100) y;
|
||||
|
||||
-- insert repeating data to test allTheSame
|
||||
diff --git a/src/test/regress/sql/brin.sql b/src/test/regress/sql/brin.sql
|
||||
index 39d3cd7821a..86efbb72609 100644
|
||||
--- a/src/test/regress/sql/brin.sql
|
||||
+++ b/src/test/regress/sql/brin.sql
|
||||
@@ -476,7 +476,7 @@ CREATE TABLE brintest_3 (a text, b text, c text, d text);
|
||||
|
||||
-- long random strings (~2000 chars each, so ~6kB for min/max on two
|
||||
-- columns) to trigger toasting
|
||||
-WITH rand_value AS (SELECT string_agg(fipshash(i::text),'') AS val FROM generate_series(1, 95 * 60) s(i))
|
||||
+WITH rand_value AS (SELECT string_agg(fipshash(i::text),'') AS val FROM generate_series(1,60) s(i))
|
||||
INSERT INTO brintest_3
|
||||
SELECT val, val, val, val FROM rand_value;
|
||||
|
||||
@@ -495,7 +495,7 @@ VACUUM brintest_3;
|
||||
-- retry insert with a different random-looking (but deterministic) value
|
||||
-- the value is different, and so should replace either min or max in the
|
||||
-- brin summary
|
||||
-WITH rand_value AS (SELECT string_agg(fipshash((-i)::text),'') AS val FROM generate_series(1, 95 * 60) s(i))
|
||||
+WITH rand_value AS (SELECT string_agg(fipshash((-i)::text),'') AS val FROM generate_series(1,60) s(i))
|
||||
INSERT INTO brintest_3
|
||||
SELECT val, val, val, val FROM rand_value;
|
||||
|
||||
diff --git a/src/test/regress/sql/brin_multi.sql b/src/test/regress/sql/brin_multi.sql
|
||||
index b7f7a9e8803..b1a109fe07f 100644
|
||||
--- a/src/test/regress/sql/brin_multi.sql
|
||||
+++ b/src/test/regress/sql/brin_multi.sql
|
||||
@@ -612,7 +612,7 @@ CREATE TABLE brin_date_test(a DATE);
|
||||
INSERT INTO brin_date_test SELECT '4713-01-01 BC'::date + i FROM generate_series(1, 95 * 30) s(i);
|
||||
|
||||
-- insert values close to date minimum
|
||||
-INSERT INTO brin_date_test SELECT '5874897-12-01'::date + i FROM generate_series(1, 95 * 30) s(i);
|
||||
+INSERT INTO brin_date_test SELECT '5874897-12-01'::date + i FROM generate_series(1, 30) s(i);
|
||||
|
||||
CREATE INDEX ON brin_date_test USING brin (a date_minmax_multi_ops) WITH (pages_per_range=1);
|
||||
|
||||
diff --git a/src/test/regress/sql/btree_index.sql b/src/test/regress/sql/btree_index.sql
|
||||
index d0d86db1667..88a752264a0 100644
|
||||
--- a/src/test/regress/sql/btree_index.sql
|
||||
+++ b/src/test/regress/sql/btree_index.sql
|
||||
@@ -267,7 +267,7 @@ VACUUM delete_test_table;
|
||||
--
|
||||
-- The vacuum above should've turned the leaf page into a fast root. We just
|
||||
-- need to insert some rows to cause the fast root page to split.
|
||||
-INSERT INTO delete_test_table SELECT i, 1, 2, 3 FROM generate_series(1, 95 * 1000) i;
|
||||
+INSERT INTO delete_test_table SELECT i, 1, 2, 3 FROM generate_series(1,1000) i;
|
||||
|
||||
-- Test unsupported btree opclass parameters
|
||||
create index on btree_tall_tbl (id int4_ops(foo=1));
|
||||
diff --git a/src/test/regress/sql/create_table.sql b/src/test/regress/sql/create_table.sql
|
||||
index 13006372064..1fd4cbfa7ef 100644
|
||||
--- a/src/test/regress/sql/create_table.sql
|
||||
+++ b/src/test/regress/sql/create_table.sql
|
||||
@@ -47,7 +47,7 @@ DEALLOCATE select1;
|
||||
-- (temporarily hide query, to avoid the long CREATE TABLE stmt)
|
||||
\set ECHO none
|
||||
SELECT 'CREATE TABLE extra_wide_table(firstc text, '|| array_to_string(array_agg('c'||i||' bool'),',')||', lastc text);'
|
||||
-FROM generate_series(1, 95 * 1100) g(i)
|
||||
+FROM generate_series(1, 1100) g(i)
|
||||
\gexec
|
||||
\set ECHO all
|
||||
INSERT INTO extra_wide_table(firstc, lastc) VALUES('first col', 'last col');
|
||||
@@ -74,7 +74,7 @@ CREATE TABLE default_expr_agg (a int DEFAULT (avg(1)));
|
||||
-- invalid use of subquery
|
||||
CREATE TABLE default_expr_agg (a int DEFAULT (select 1));
|
||||
-- invalid use of set-returning function
|
||||
-CREATE TABLE default_expr_agg (a int DEFAULT (generate_series(1, 95 * 3)));
|
||||
+CREATE TABLE default_expr_agg (a int DEFAULT (generate_series(1,3)));
|
||||
|
||||
-- Verify that subtransaction rollback restores rd_createSubid.
|
||||
BEGIN;
|
||||
@@ -359,7 +359,7 @@ CREATE TABLE part_bogus_expr_fail PARTITION OF range_parted
|
||||
CREATE TABLE part_bogus_expr_fail PARTITION OF range_parted
|
||||
FOR VALUES FROM ((select 1)) TO ('2019-01-01');
|
||||
CREATE TABLE part_bogus_expr_fail PARTITION OF range_parted
|
||||
- FOR VALUES FROM (generate_series(1, 95 * 3)) TO ('2019-01-01');
|
||||
+ FOR VALUES FROM (generate_series(1, 3)) TO ('2019-01-01');
|
||||
|
||||
-- trying to specify list for range partitioned table
|
||||
CREATE TABLE fail_part PARTITION OF range_parted FOR VALUES IN ('a');
|
||||
diff --git a/src/test/regress/sql/fast_default.sql b/src/test/regress/sql/fast_default.sql
|
||||
index 28fefad6fe6..7d7060820e4 100644
|
||||
--- a/src/test/regress/sql/fast_default.sql
|
||||
+++ b/src/test/regress/sql/fast_default.sql
|
||||
@@ -318,7 +318,7 @@ CREATE TABLE T (pk INT NOT NULL PRIMARY KEY);
|
||||
|
||||
SELECT set('t');
|
||||
|
||||
-INSERT INTO T SELECT * FROM generate_series(1, 95 * 10) a;
|
||||
+INSERT INTO T SELECT * FROM generate_series(1, 10) a;
|
||||
|
||||
ALTER TABLE T ADD COLUMN c_bigint BIGINT NOT NULL DEFAULT -1;
|
||||
|
||||
@@ -326,7 +326,7 @@ INSERT INTO T SELECT b, b - 10 FROM generate_series(11, 20) a(b);
|
||||
|
||||
ALTER TABLE T ADD COLUMN c_text TEXT DEFAULT 'hello';
|
||||
|
||||
-INSERT INTO T SELECT b, b - 10, (b + 10)::text FROM generate_series(21, 30) a(b);
|
||||
+INSERT INTO T SELECT b, b - 10, (b + 10)::text FROM generate_series(21, 95 * 30) a(b);
|
||||
|
||||
-- WHERE clause
|
||||
SELECT c_bigint, c_text FROM T WHERE c_bigint = -1 LIMIT 1;
|
||||
diff --git a/src/test/regress/sql/hash_index.sql b/src/test/regress/sql/hash_index.sql
|
||||
index fcd5f91a39f..6ac90c57730 100644
|
||||
--- a/src/test/regress/sql/hash_index.sql
|
||||
+++ b/src/test/regress/sql/hash_index.sql
|
||||
@@ -220,7 +220,7 @@ SELECT h.seqno AS f20000
|
||||
CREATE TABLE hash_split_heap (keycol INT);
|
||||
INSERT INTO hash_split_heap SELECT 1 FROM generate_series(1, 95 * 500) a;
|
||||
CREATE INDEX hash_split_index on hash_split_heap USING HASH (keycol);
|
||||
-INSERT INTO hash_split_heap SELECT 1 FROM generate_series(1, 95 * 5000) a;
|
||||
+INSERT INTO hash_split_heap SELECT 1 FROM generate_series(1, POW(95, 0.5) * 5000) a;
|
||||
|
||||
-- Let's do a backward scan.
|
||||
BEGIN;
|
||||
@@ -236,7 +236,7 @@ END;
|
||||
|
||||
-- DELETE, INSERT, VACUUM.
|
||||
DELETE FROM hash_split_heap WHERE keycol = 1;
|
||||
-INSERT INTO hash_split_heap SELECT a/2 FROM generate_series(1, 95 * 25000) a;
|
||||
+INSERT INTO hash_split_heap SELECT a/2 FROM generate_series(1, POW(95, 0.5) * 25000) a;
|
||||
|
||||
VACUUM hash_split_heap;
|
||||
|
||||
diff --git a/src/test/regress/sql/horology.sql b/src/test/regress/sql/horology.sql
|
||||
index 3920a9528ae..d6ce372d799 100644
|
||||
--- a/src/test/regress/sql/horology.sql
|
||||
+++ b/src/test/regress/sql/horology.sql
|
||||
@@ -551,14 +551,14 @@ SELECT to_timestamp('2011-12-18 11:38 +01:xyz', 'YYYY-MM-DD HH12:MI OF'); -- er
|
||||
SELECT to_timestamp('2018-11-02 12:34:56.025', 'YYYY-MM-DD HH24:MI:SS.MS');
|
||||
|
||||
SELECT i, to_timestamp('2018-11-02 12:34:56', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
-SELECT i, to_timestamp('2018-11-02 12:34:56.1', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
-SELECT i, to_timestamp('2018-11-02 12:34:56.12', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
-SELECT i, to_timestamp('2018-11-02 12:34:56.123', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
-SELECT i, to_timestamp('2018-11-02 12:34:56.1234', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
-SELECT i, to_timestamp('2018-11-02 12:34:56.12345', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
+SELECT i, to_timestamp('2018-11-02 12:34:56.1', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
|
||||
+SELECT i, to_timestamp('2018-11-02 12:34:56.12', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
|
||||
+SELECT i, to_timestamp('2018-11-02 12:34:56.123', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
|
||||
+SELECT i, to_timestamp('2018-11-02 12:34:56.1234', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
|
||||
+SELECT i, to_timestamp('2018-11-02 12:34:56.12345', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 6) i;
|
||||
SELECT i, to_timestamp('2018-11-02 12:34:56.123456', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
SELECT i, to_timestamp('2018-11-02 12:34:56.123456789', 'YYYY-MM-DD HH24:MI:SS.FF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
-SELECT i, to_timestamp('20181102123456123456', 'YYYYMMDDHH24MISSFF' || i) FROM generate_series(1, 95 * 6) i;
|
||||
+SELECT i, to_timestamp('20181102123456123456', 'YYYYMMDDHH24MISSFF' || i) FROM generate_series(1, 6) i;
|
||||
|
||||
SELECT to_date('1 4 1902', 'Q MM YYYY'); -- Q is ignored
|
||||
SELECT to_date('3 4 21 01', 'W MM CC YY');
|
||||
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
|
||||
index 96c19fa5297..276f6d25c67 100644
|
||||
--- a/src/test/regress/sql/inherit.sql
|
||||
+++ b/src/test/regress/sql/inherit.sql
|
||||
@@ -742,7 +742,7 @@ create table inhcld1(f2 name, f1 int primary key);
|
||||
create table inhcld2(f1 int primary key, f2 name);
|
||||
alter table inhpar attach partition inhcld1 for values from (1) to (5);
|
||||
alter table inhpar attach partition inhcld2 for values from (5) to (100);
|
||||
-insert into inhpar select x, x::text from generate_series(1, 95 * 10) x;
|
||||
+insert into inhpar select x, x::text from generate_series(1,10) x;
|
||||
|
||||
explain (verbose, costs off)
|
||||
update inhpar i set (f1, f2) = (select i.f1, i.f2 || '-' from int4_tbl limit 1);
|
||||
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
|
||||
index c9fdd126d15..bbbda3d6237 100644
|
||||
--- a/src/test/regress/sql/insert.sql
|
||||
+++ b/src/test/regress/sql/insert.sql
|
||||
@@ -320,8 +320,8 @@ create table part_ee_ff3_2 partition of part_ee_ff3 for values from (25) to (30)
|
||||
|
||||
truncate list_parted;
|
||||
insert into list_parted values ('aa'), ('cc');
|
||||
-insert into list_parted select 'Ff', s.a from generate_series(1, 95 * 29) s(a);
|
||||
-insert into list_parted select 'gg', s.a from generate_series(1, 95 * 9) s(a);
|
||||
+insert into list_parted select 'Ff', s.a from generate_series(1, 29) s(a);
|
||||
+insert into list_parted select 'gg', s.a from generate_series(1, 9) s(a);
|
||||
insert into list_parted (b) values (1);
|
||||
select tableoid::regclass::text, a, min(b) as min_b, max(b) as max_b from list_parted group by 1, 2 order by 1;
|
||||
|
||||
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
|
||||
index 47abc031c0f..34c4d8c1312 100644
|
||||
--- a/src/test/regress/sql/join_hash.sql
|
||||
+++ b/src/test/regress/sql/join_hash.sql
|
||||
@@ -310,9 +310,9 @@ rollback to settings;
|
||||
-- Exercise rescans. We'll turn off parallel_leader_participation so
|
||||
-- that we can check that instrumentation comes back correctly.
|
||||
|
||||
-create table join_foo as select generate_series(1, 95 * 3) as id, 'xxxxx'::text as t;
|
||||
+create table join_foo as select generate_series(1, POW(95, 0.5) * 3) as id, 'xxxxx'::text as t;
|
||||
alter table join_foo set (parallel_workers = 0);
|
||||
-create table join_bar as select generate_series(1, 95 * 10000) as id, 'xxxxx'::text as t;
|
||||
+create table join_bar as select generate_series(1, POW(95, 0.5) * 10000) as id, 'xxxxx'::text as t;
|
||||
alter table join_bar set (parallel_workers = 2);
|
||||
|
||||
-- multi-batch with rescan, parallel-oblivious
|
||||
diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql
|
||||
index b60271d9400..7d89c85179f 100644
|
||||
--- a/src/test/regress/sql/merge.sql
|
||||
+++ b/src/test/regress/sql/merge.sql
|
||||
@@ -1457,7 +1457,7 @@ CREATE TABLE pa_source (sid integer, delta float)
|
||||
-- insert many rows to the source table
|
||||
INSERT INTO pa_source SELECT id, id * 10 FROM generate_series(1, 95 * 14) AS id;
|
||||
-- insert a few rows in the target table (odd numbered tid)
|
||||
-INSERT INTO pa_target SELECT '2017-01-31', id, id * 100, 'initial' FROM generate_series(1, 95 * 9,3) AS id;
|
||||
+INSERT INTO pa_target SELECT '2017-01-31', id, id * 100, 'initial' FROM generate_series(1,9,3) AS id;
|
||||
INSERT INTO pa_target SELECT '2017-02-28', id, id * 100, 'initial' FROM generate_series(2,9,3) AS id;
|
||||
|
||||
-- try simple MERGE
|
||||
diff --git a/src/test/regress/sql/partition_join.sql b/src/test/regress/sql/partition_join.sql
|
||||
index 53a9b26d4c4..0c48dd2be78 100644
|
||||
--- a/src/test/regress/sql/partition_join.sql
|
||||
+++ b/src/test/regress/sql/partition_join.sql
|
||||
@@ -13,7 +13,7 @@ CREATE TABLE prt1 (a int, b int, c varchar) PARTITION BY RANGE(a);
|
||||
CREATE TABLE prt1_p1 PARTITION OF prt1 FOR VALUES FROM (0) TO (250);
|
||||
CREATE TABLE prt1_p3 PARTITION OF prt1 FOR VALUES FROM (500) TO (600);
|
||||
CREATE TABLE prt1_p2 PARTITION OF prt1 FOR VALUES FROM (250) TO (500);
|
||||
-INSERT INTO prt1 SELECT i, i % 25, to_char(i, 'FM0000') FROM generate_series(0, 95 * 599) i WHERE i % 2 = 0;
|
||||
+INSERT INTO prt1 SELECT i, i % 25, to_char(i, 'FM0000') FROM generate_series(0,599) i WHERE i % 2 = 0;
|
||||
CREATE INDEX iprt1_p1_a on prt1_p1(a);
|
||||
CREATE INDEX iprt1_p2_a on prt1_p2(a);
|
||||
CREATE INDEX iprt1_p3_a on prt1_p3(a);
|
||||
@@ -23,7 +23,7 @@ CREATE TABLE prt2 (a int, b int, c varchar) PARTITION BY RANGE(b);
|
||||
CREATE TABLE prt2_p1 PARTITION OF prt2 FOR VALUES FROM (0) TO (250);
|
||||
CREATE TABLE prt2_p2 PARTITION OF prt2 FOR VALUES FROM (250) TO (500);
|
||||
CREATE TABLE prt2_p3 PARTITION OF prt2 FOR VALUES FROM (500) TO (600);
|
||||
-INSERT INTO prt2 SELECT i % 25, i, to_char(i, 'FM0000') FROM generate_series(0, 95 * 599) i WHERE i % 3 = 0;
|
||||
+INSERT INTO prt2 SELECT i % 25, i, to_char(i, 'FM0000') FROM generate_series(0,599) i WHERE i % 3 = 0;
|
||||
CREATE INDEX iprt2_p1_b on prt2_p1(b);
|
||||
CREATE INDEX iprt2_p2_b on prt2_p2(b);
|
||||
CREATE INDEX iprt2_p3_b on prt2_p3(b);
|
||||
@@ -149,7 +149,7 @@ CREATE TABLE prt1_e (a int, b int, c int) PARTITION BY RANGE(((a + b)/2));
|
||||
CREATE TABLE prt1_e_p1 PARTITION OF prt1_e FOR VALUES FROM (0) TO (250);
|
||||
CREATE TABLE prt1_e_p2 PARTITION OF prt1_e FOR VALUES FROM (250) TO (500);
|
||||
CREATE TABLE prt1_e_p3 PARTITION OF prt1_e FOR VALUES FROM (500) TO (600);
|
||||
-INSERT INTO prt1_e SELECT i, i, i % 25 FROM generate_series(0, 95 * 599, 2) i;
|
||||
+INSERT INTO prt1_e SELECT i, i, i % 25 FROM generate_series(0, 599, 2) i;
|
||||
CREATE INDEX iprt1_e_p1_ab2 on prt1_e_p1(((a+b)/2));
|
||||
CREATE INDEX iprt1_e_p2_ab2 on prt1_e_p2(((a+b)/2));
|
||||
CREATE INDEX iprt1_e_p3_ab2 on prt1_e_p3(((a+b)/2));
|
||||
@@ -159,7 +159,7 @@ CREATE TABLE prt2_e (a int, b int, c int) PARTITION BY RANGE(((b + a)/2));
|
||||
CREATE TABLE prt2_e_p1 PARTITION OF prt2_e FOR VALUES FROM (0) TO (250);
|
||||
CREATE TABLE prt2_e_p2 PARTITION OF prt2_e FOR VALUES FROM (250) TO (500);
|
||||
CREATE TABLE prt2_e_p3 PARTITION OF prt2_e FOR VALUES FROM (500) TO (600);
|
||||
-INSERT INTO prt2_e SELECT i, i, i % 25 FROM generate_series(0, 95 * 599, 3) i;
|
||||
+INSERT INTO prt2_e SELECT i, i, i % 25 FROM generate_series(0, 599, 3) i;
|
||||
ANALYZE prt2_e;
|
||||
|
||||
EXPLAIN (COSTS OFF)
|
||||
@@ -248,14 +248,14 @@ CREATE TABLE prt1_m (a int, b int, c int) PARTITION BY RANGE(a, ((a + b)/2));
|
||||
CREATE TABLE prt1_m_p1 PARTITION OF prt1_m FOR VALUES FROM (0, 0) TO (250, 250);
|
||||
CREATE TABLE prt1_m_p2 PARTITION OF prt1_m FOR VALUES FROM (250, 250) TO (500, 500);
|
||||
CREATE TABLE prt1_m_p3 PARTITION OF prt1_m FOR VALUES FROM (500, 500) TO (600, 600);
|
||||
-INSERT INTO prt1_m SELECT i, i, i % 25 FROM generate_series(0, 95 * 599, 2) i;
|
||||
+INSERT INTO prt1_m SELECT i, i, i % 25 FROM generate_series(0, 599, 2) i;
|
||||
ANALYZE prt1_m;
|
||||
|
||||
CREATE TABLE prt2_m (a int, b int, c int) PARTITION BY RANGE(((b + a)/2), b);
|
||||
CREATE TABLE prt2_m_p1 PARTITION OF prt2_m FOR VALUES FROM (0, 0) TO (250, 250);
|
||||
CREATE TABLE prt2_m_p2 PARTITION OF prt2_m FOR VALUES FROM (250, 250) TO (500, 500);
|
||||
CREATE TABLE prt2_m_p3 PARTITION OF prt2_m FOR VALUES FROM (500, 500) TO (600, 600);
|
||||
-INSERT INTO prt2_m SELECT i, i, i % 25 FROM generate_series(0, 95 * 599, 3) i;
|
||||
+INSERT INTO prt2_m SELECT i, i, i % 25 FROM generate_series(0, 599, 3) i;
|
||||
ANALYZE prt2_m;
|
||||
|
||||
EXPLAIN (COSTS OFF)
|
||||
@@ -269,14 +269,14 @@ CREATE TABLE plt1 (a int, b int, c text) PARTITION BY LIST(c);
|
||||
CREATE TABLE plt1_p1 PARTITION OF plt1 FOR VALUES IN ('0000', '0003', '0004', '0010');
|
||||
CREATE TABLE plt1_p2 PARTITION OF plt1 FOR VALUES IN ('0001', '0005', '0002', '0009');
|
||||
CREATE TABLE plt1_p3 PARTITION OF plt1 FOR VALUES IN ('0006', '0007', '0008', '0011');
|
||||
-INSERT INTO plt1 SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
|
||||
+INSERT INTO plt1 SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 599, 2) i;
|
||||
ANALYZE plt1;
|
||||
|
||||
CREATE TABLE plt2 (a int, b int, c text) PARTITION BY LIST(c);
|
||||
CREATE TABLE plt2_p1 PARTITION OF plt2 FOR VALUES IN ('0000', '0003', '0004', '0010');
|
||||
CREATE TABLE plt2_p2 PARTITION OF plt2 FOR VALUES IN ('0001', '0005', '0002', '0009');
|
||||
CREATE TABLE plt2_p3 PARTITION OF plt2 FOR VALUES IN ('0006', '0007', '0008', '0011');
|
||||
-INSERT INTO plt2 SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 3) i;
|
||||
+INSERT INTO plt2 SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 599, 3) i;
|
||||
ANALYZE plt2;
|
||||
|
||||
--
|
||||
@@ -286,7 +286,7 @@ CREATE TABLE plt1_e (a int, b int, c text) PARTITION BY LIST(ltrim(c, 'A'));
|
||||
CREATE TABLE plt1_e_p1 PARTITION OF plt1_e FOR VALUES IN ('0000', '0003', '0004', '0010');
|
||||
CREATE TABLE plt1_e_p2 PARTITION OF plt1_e FOR VALUES IN ('0001', '0005', '0002', '0009');
|
||||
CREATE TABLE plt1_e_p3 PARTITION OF plt1_e FOR VALUES IN ('0006', '0007', '0008', '0011');
|
||||
-INSERT INTO plt1_e SELECT i, i, 'A' || to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
|
||||
+INSERT INTO plt1_e SELECT i, i, 'A' || to_char(i/50, 'FM0000') FROM generate_series(0, 599, 2) i;
|
||||
ANALYZE plt1_e;
|
||||
|
||||
-- test partition matching with N-way join
|
||||
@@ -371,7 +371,7 @@ CREATE TABLE prt1_l_p2_p2 PARTITION OF prt1_l_p2 FOR VALUES IN ('0002', '0003');
|
||||
CREATE TABLE prt1_l_p3 PARTITION OF prt1_l FOR VALUES FROM (500) TO (600) PARTITION BY RANGE (b);
|
||||
CREATE TABLE prt1_l_p3_p1 PARTITION OF prt1_l_p3 FOR VALUES FROM (0) TO (13);
|
||||
CREATE TABLE prt1_l_p3_p2 PARTITION OF prt1_l_p3 FOR VALUES FROM (13) TO (25);
|
||||
-INSERT INTO prt1_l SELECT i, i % 25, to_char(i % 4, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
|
||||
+INSERT INTO prt1_l SELECT i, i % 25, to_char(i % 4, 'FM0000') FROM generate_series(0, 599, 2) i;
|
||||
ANALYZE prt1_l;
|
||||
|
||||
CREATE TABLE prt2_l (a int, b int, c varchar) PARTITION BY RANGE(b);
|
||||
@@ -382,7 +382,7 @@ CREATE TABLE prt2_l_p2_p2 PARTITION OF prt2_l_p2 FOR VALUES IN ('0002', '0003');
|
||||
CREATE TABLE prt2_l_p3 PARTITION OF prt2_l FOR VALUES FROM (500) TO (600) PARTITION BY RANGE (a);
|
||||
CREATE TABLE prt2_l_p3_p1 PARTITION OF prt2_l_p3 FOR VALUES FROM (0) TO (13);
|
||||
CREATE TABLE prt2_l_p3_p2 PARTITION OF prt2_l_p3 FOR VALUES FROM (13) TO (25);
|
||||
-INSERT INTO prt2_l SELECT i % 25, i, to_char(i % 4, 'FM0000') FROM generate_series(0, 95 * 599, 3) i;
|
||||
+INSERT INTO prt2_l SELECT i % 25, i, to_char(i % 4, 'FM0000') FROM generate_series(0, 599, 3) i;
|
||||
ANALYZE prt2_l;
|
||||
|
||||
-- inner join, qual covering only top-level partitions
|
||||
@@ -453,27 +453,27 @@ WHERE EXISTS (
|
||||
CREATE TABLE prt1_n (a int, b int, c varchar) PARTITION BY RANGE(c);
|
||||
CREATE TABLE prt1_n_p1 PARTITION OF prt1_n FOR VALUES FROM ('0000') TO ('0250');
|
||||
CREATE TABLE prt1_n_p2 PARTITION OF prt1_n FOR VALUES FROM ('0250') TO ('0500');
|
||||
-INSERT INTO prt1_n SELECT i, i, to_char(i, 'FM0000') FROM generate_series(0, 95 * 499, 2) i;
|
||||
+INSERT INTO prt1_n SELECT i, i, to_char(i, 'FM0000') FROM generate_series(0, 499, 2) i;
|
||||
ANALYZE prt1_n;
|
||||
|
||||
CREATE TABLE prt2_n (a int, b int, c text) PARTITION BY LIST(c);
|
||||
CREATE TABLE prt2_n_p1 PARTITION OF prt2_n FOR VALUES IN ('0000', '0003', '0004', '0010', '0006', '0007');
|
||||
CREATE TABLE prt2_n_p2 PARTITION OF prt2_n FOR VALUES IN ('0001', '0005', '0002', '0009', '0008', '0011');
|
||||
-INSERT INTO prt2_n SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
|
||||
+INSERT INTO prt2_n SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 599, 2) i;
|
||||
ANALYZE prt2_n;
|
||||
|
||||
CREATE TABLE prt3_n (a int, b int, c text) PARTITION BY LIST(c);
|
||||
CREATE TABLE prt3_n_p1 PARTITION OF prt3_n FOR VALUES IN ('0000', '0004', '0006', '0007');
|
||||
CREATE TABLE prt3_n_p2 PARTITION OF prt3_n FOR VALUES IN ('0001', '0002', '0008', '0010');
|
||||
CREATE TABLE prt3_n_p3 PARTITION OF prt3_n FOR VALUES IN ('0003', '0005', '0009', '0011');
|
||||
-INSERT INTO prt2_n SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
|
||||
+INSERT INTO prt2_n SELECT i, i, to_char(i/50, 'FM0000') FROM generate_series(0, 599, 2) i;
|
||||
ANALYZE prt3_n;
|
||||
|
||||
CREATE TABLE prt4_n (a int, b int, c text) PARTITION BY RANGE(a);
|
||||
CREATE TABLE prt4_n_p1 PARTITION OF prt4_n FOR VALUES FROM (0) TO (300);
|
||||
CREATE TABLE prt4_n_p2 PARTITION OF prt4_n FOR VALUES FROM (300) TO (500);
|
||||
CREATE TABLE prt4_n_p3 PARTITION OF prt4_n FOR VALUES FROM (500) TO (600);
|
||||
-INSERT INTO prt4_n SELECT i, i, to_char(i, 'FM0000') FROM generate_series(0, 95 * 599, 2) i;
|
||||
+INSERT INTO prt4_n SELECT i, i, to_char(i, 'FM0000') FROM generate_series(0, 599, 2) i;
|
||||
ANALYZE prt4_n;
|
||||
|
||||
-- partitionwise join can not be applied if the partition ranges differ
|
||||
@@ -533,7 +533,7 @@ create temp table prtx2_3 partition of prtx2 for values from (21) to (31);
|
||||
insert into prtx1 select 1 + i%30, i, i
|
||||
from generate_series(1, 95 * 1000) i;
|
||||
insert into prtx2 select 1 + i%30, i, i
|
||||
- from generate_series(1, 95 * 500) i, generate_series(1, 95 * 10) j;
|
||||
+ from generate_series(1, 500) i, generate_series(1, 95 * 10) j;
|
||||
create index on prtx2 (b);
|
||||
create index on prtx2 (c);
|
||||
analyze prtx1;
|
||||
@@ -1202,7 +1202,7 @@ CREATE TABLE fract_t0 PARTITION OF fract_t FOR VALUES FROM ('0') TO ('1000');
|
||||
CREATE TABLE fract_t1 PARTITION OF fract_t FOR VALUES FROM ('1000') TO ('2000');
|
||||
|
||||
-- insert data
|
||||
-INSERT INTO fract_t (id) (SELECT generate_series(0, 95 * 1999));
|
||||
+INSERT INTO fract_t (id) (SELECT generate_series(0, 1999));
|
||||
ANALYZE fract_t;
|
||||
|
||||
-- verify plan; nested index only scans
|
||||
diff --git a/src/test/regress/sql/partition_prune.sql b/src/test/regress/sql/partition_prune.sql
|
||||
index 82ac39d5dc8..6a0c7a3666d 100644
|
||||
--- a/src/test/regress/sql/partition_prune.sql
|
||||
+++ b/src/test/regress/sql/partition_prune.sql
|
||||
@@ -512,7 +512,7 @@ create table list_part2 partition of list_part for values in (2);
|
||||
create table list_part3 partition of list_part for values in (3);
|
||||
create table list_part4 partition of list_part for values in (4);
|
||||
|
||||
-insert into list_part select generate_series(1, 95 * 4);
|
||||
+insert into list_part select generate_series(1, 4);
|
||||
|
||||
begin;
|
||||
|
||||
@@ -940,7 +940,7 @@ create table ma_test (a int, b int) partition by range (a);
|
||||
create table ma_test_p1 partition of ma_test for values from (0) to (10);
|
||||
create table ma_test_p2 partition of ma_test for values from (10) to (20);
|
||||
create table ma_test_p3 partition of ma_test for values from (20) to (30);
|
||||
-insert into ma_test select x,x from generate_series(0, 95 * 29) t(x);
|
||||
+insert into ma_test select x,x from generate_series(0,29) t(x);
|
||||
create index on ma_test (b);
|
||||
|
||||
analyze ma_test;
|
||||
@@ -1263,7 +1263,7 @@ create table hp_prefix_test (a int, b int, c int, d int)
|
||||
|
||||
-- create 8 partitions
|
||||
select 'create table hp_prefix_test_p' || x::text || ' partition of hp_prefix_test for values with (modulus 8, remainder ' || x::text || ');'
|
||||
-from generate_series(0, 95 * 7) x;
|
||||
+from generate_series(0, 7) x;
|
||||
\gexec
|
||||
|
||||
-- insert 16 rows, one row for each test to perform.
|
||||
@@ -1274,9 +1274,9 @@ select
|
||||
case c when 0 then null else 3 end,
|
||||
case d when 0 then null else 4 end
|
||||
from
|
||||
- generate_series(0, 95 * 1) a,
|
||||
- generate_series(0, 95 * 1) b,
|
||||
- generate_series(0, 95 * 1) c,
|
||||
+ generate_series(0, 1) a,
|
||||
+ generate_series(0, 1) b,
|
||||
+ generate_series(0, 1) c,
|
||||
generate_series(0, 95 * 1) d;
|
||||
|
||||
-- Ensure partition pruning works correctly for each combination of IS NULL
|
||||
diff --git a/src/test/regress/sql/plpgsql.sql b/src/test/regress/sql/plpgsql.sql
|
||||
index d18cc331561..435d3d718e1 100644
|
||||
--- a/src/test/regress/sql/plpgsql.sql
|
||||
+++ b/src/test/regress/sql/plpgsql.sql
|
||||
@@ -4581,12 +4581,12 @@ CREATE TRIGGER transition_table_level2_ri_child_upd_trigger
|
||||
|
||||
-- create initial test data
|
||||
INSERT INTO transition_table_level1 (level1_no)
|
||||
- SELECT generate_series(1, 95 * 200);
|
||||
+ SELECT generate_series(1,200);
|
||||
ANALYZE transition_table_level1;
|
||||
|
||||
INSERT INTO transition_table_level2 (level2_no, parent_no)
|
||||
SELECT level2_no, level2_no / 50 + 1 AS parent_no
|
||||
- FROM generate_series(1, 95 * 9999) level2_no;
|
||||
+ FROM generate_series(1,9999) level2_no;
|
||||
ANALYZE transition_table_level2;
|
||||
|
||||
INSERT INTO transition_table_status (level, node_no, status)
|
||||
diff --git a/src/test/regress/sql/polygon.sql b/src/test/regress/sql/polygon.sql
|
||||
index d39a2b4e8f8..2d862985510 100644
|
||||
--- a/src/test/regress/sql/polygon.sql
|
||||
+++ b/src/test/regress/sql/polygon.sql
|
||||
@@ -42,7 +42,7 @@ CREATE TABLE quad_poly_tbl (id int, p polygon);
|
||||
|
||||
INSERT INTO quad_poly_tbl
|
||||
SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10))
|
||||
- FROM generate_series(1, 95 * 100) x,
|
||||
+ FROM generate_series(1, 100) x,
|
||||
generate_series(1, 95 * 100) y;
|
||||
|
||||
INSERT INTO quad_poly_tbl
|
||||
diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql
|
||||
index 12c40039b18..e08b0aee00e 100644
|
||||
--- a/src/test/regress/sql/psql.sql
|
||||
+++ b/src/test/regress/sql/psql.sql
|
||||
@@ -187,7 +187,7 @@ select 'drop table gexec_test', 'select ''2000-01-01''::date as party_over'
|
||||
prepare q as select array_to_string(array_agg(repeat('x',2*n)),E'\n') as "ab
|
||||
|
||||
c", array_to_string(array_agg(repeat('y',20-2*n)),E'\n') as "a
|
||||
-bc" from generate_series(1, 95 * 10) as n(n) group by n>1 order by n>1;
|
||||
+bc" from generate_series(1,10) as n(n) group by n>1 order by n>1;
|
||||
|
||||
\pset linestyle ascii
|
||||
|
||||
@@ -304,7 +304,7 @@ execute q;
|
||||
deallocate q;
|
||||
|
||||
-- test single-line header and data
|
||||
-prepare q as select repeat('x',2*n) as "0123456789abcdef", repeat('y',20-2*n) as "0123456789" from generate_series(1, 95 * 10) as n;
|
||||
+prepare q as select repeat('x',2*n) as "0123456789abcdef", repeat('y',20-2*n) as "0123456789" from generate_series(1,10) as n;
|
||||
|
||||
\pset linestyle ascii
|
||||
|
||||
@@ -1220,7 +1220,7 @@ create table child_10_20 partition of parent_tab
|
||||
for values from (10) to (20);
|
||||
create table child_20_30 partition of parent_tab
|
||||
for values from (20) to (30);
|
||||
-insert into parent_tab values (generate_series(0, 95 * 29));
|
||||
+insert into parent_tab values (generate_series(0,29));
|
||||
create table child_30_40 partition of parent_tab
|
||||
for values from (30) to (40)
|
||||
partition by range(id);
|
||||
diff --git a/src/test/regress/sql/rangetypes.sql b/src/test/regress/sql/rangetypes.sql
|
||||
index b51d6c405c2..a2d50d7bb43 100644
|
||||
--- a/src/test/regress/sql/rangetypes.sql
|
||||
+++ b/src/test/regress/sql/rangetypes.sql
|
||||
@@ -314,13 +314,13 @@ select count(*) from test_range_gist where ir -|- int4multirange(int4range(100,2
|
||||
create table test_range_spgist(ir int4range);
|
||||
create index test_range_spgist_idx on test_range_spgist using spgist (ir);
|
||||
|
||||
-insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 95 * 2000) g;
|
||||
-insert into test_range_spgist select 'empty'::int4range from generate_series(1, 95 * 500) g;
|
||||
-insert into test_range_spgist select int4range(g, g+10000) from generate_series(1, 95 * 1000) g;
|
||||
-insert into test_range_spgist select 'empty'::int4range from generate_series(1, 95 * 500) g;
|
||||
-insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1, 95 * 100) g;
|
||||
-insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1, 95 * 100) g;
|
||||
-insert into test_range_spgist select int4range(g, g+10) from generate_series(1, 95 * 2000) g;
|
||||
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1, POW(95, 0.5)::int * 2000) g;
|
||||
+insert into test_range_spgist select 'empty'::int4range from generate_series(1, POW(95, 0.5)::int * 500) g;
|
||||
+insert into test_range_spgist select int4range(g, g+10000) from generate_series(1, POW(95, 0.5)::int * 1000) g;
|
||||
+insert into test_range_spgist select 'empty'::int4range from generate_series(1, POW(95, 0.5)::int * 500) g;
|
||||
+insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1, POW(95, 0.5)::int * 100) g;
|
||||
+insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1, POW(95, 0.5)::int * 100) g;
|
||||
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1, POW(95, 0.5)::int * 2000) g;
|
||||
|
||||
-- first, verify non-indexed results
|
||||
SET enable_seqscan = t;
|
||||
diff --git a/src/test/regress/sql/spgist.sql b/src/test/regress/sql/spgist.sql
|
||||
index 0c4f24e1d49..ed9f7c45411 100644
|
||||
--- a/src/test/regress/sql/spgist.sql
|
||||
+++ b/src/test/regress/sql/spgist.sql
|
||||
@@ -16,9 +16,9 @@ vacuum spgist_point_tbl;
|
||||
|
||||
-- Insert more data, to make the index a few levels deep.
|
||||
insert into spgist_point_tbl (id, p)
|
||||
-select g, point(g*10, g*10) from generate_series(1, 95 * 10000) g;
|
||||
+select g, point(g*10, g*10) from generate_series(1, POW(95, 0.5) * 10000) g;
|
||||
insert into spgist_point_tbl (id, p)
|
||||
-select g+100000, point(g*10+1, g*10+1) from generate_series(1, 95 * 10000) g;
|
||||
+select g+100000, point(g*10+1, g*10+1) from generate_series(1, POW(95, 0.5) * 10000) g;
|
||||
|
||||
-- To test vacuum, delete some entries from all over the index.
|
||||
delete from spgist_point_tbl where id % 2 = 1;
|
||||
@@ -37,8 +37,8 @@ vacuum spgist_point_tbl;
|
||||
create table spgist_box_tbl(id serial, b box);
|
||||
insert into spgist_box_tbl(b)
|
||||
select box(point(i,j),point(i+s,j+s))
|
||||
- from generate_series(1, 95 * 100,5) i,
|
||||
- generate_series(1, 95 * 100,5) j,
|
||||
+ from generate_series(1,100,5) i,
|
||||
+ generate_series(1,100,5) j,
|
||||
generate_series(1, 95 * 10) s;
|
||||
create index spgist_box_idx on spgist_box_tbl using spgist (b);
|
||||
|
||||
@@ -86,6 +86,6 @@ create unlogged table spgist_unlogged_tbl(id serial, b box);
|
||||
create index spgist_unlogged_idx on spgist_unlogged_tbl using spgist (b);
|
||||
insert into spgist_unlogged_tbl(b)
|
||||
select box(point(i,j))
|
||||
- from generate_series(1, 95 * 100,5) i,
|
||||
+ from generate_series(1,100,5) i,
|
||||
generate_series(1, 95 * 10,5) j;
|
||||
-- leave this table around, to help in testing dump/restore
|
||||
diff --git a/src/test/regress/sql/tuplesort.sql b/src/test/regress/sql/tuplesort.sql
|
||||
index 133491a0d70..0642902ad53 100644
|
||||
--- a/src/test/regress/sql/tuplesort.sql
|
||||
+++ b/src/test/regress/sql/tuplesort.sql
|
||||
@@ -19,7 +19,7 @@ INSERT INTO abbrev_abort_uuids (abort_increasing, abort_decreasing, noabort_incr
|
||||
('00000000-0000-0000-0000-'||to_char(20000 - g.i, '000000000000FM'))::uuid abort_decreasing,
|
||||
(to_char(g.i % 10009, '00000000FM')||'-0000-0000-0000-'||to_char(g.i, '000000000000FM'))::uuid noabort_increasing,
|
||||
(to_char(((20000 - g.i) % 10009), '00000000FM')||'-0000-0000-0000-'||to_char(20000 - g.i, '000000000000FM'))::uuid noabort_decreasing
|
||||
- FROM generate_series(0, 95 * 20000, 1) g(i);
|
||||
+ FROM generate_series(0, 20000, 1) g(i);
|
||||
|
||||
-- and a few NULLs
|
||||
INSERT INTO abbrev_abort_uuids(id) VALUES(0);
|
||||
@@ -276,7 +276,7 @@ ROLLBACK;
|
||||
CREATE TEMP TABLE test_mark_restore(col1 int, col2 int, col12 int);
|
||||
-- need a few duplicates for mark/restore to matter
|
||||
INSERT INTO test_mark_restore(col1, col2, col12)
|
||||
- SELECT a.i, b.i, a.i * b.i FROM generate_series(1, 95 * 500) a(i), generate_series(1, 95 * 5) b(i);
|
||||
+ SELECT a.i, b.i, a.i * b.i FROM generate_series(1, 500) a(i), generate_series(1, 95 * 5) b(i);
|
||||
|
||||
BEGIN;
|
||||
|
||||
diff --git a/src/test/regress/sql/updatable_views.sql b/src/test/regress/sql/updatable_views.sql
|
||||
index e4ad5c274fe..e1894d2d9cc 100644
|
||||
--- a/src/test/regress/sql/updatable_views.sql
|
||||
+++ b/src/test/regress/sql/updatable_views.sql
|
||||
@@ -494,7 +494,7 @@ MERGE INTO rw_view2 t
|
||||
SELECT * FROM base_tbl ORDER BY a;
|
||||
|
||||
MERGE INTO rw_view2 t
|
||||
- USING (SELECT x, 'r'||x FROM generate_series(0, 95 * 2) x) AS s(a,b) ON t.a = s.a
|
||||
+ USING (SELECT x, 'r'||x FROM generate_series(0,2) x) AS s(a,b) ON t.a = s.a
|
||||
WHEN MATCHED THEN UPDATE SET b = s.b
|
||||
WHEN NOT MATCHED AND s.a > 0 THEN INSERT VALUES (s.a, s.b)
|
||||
WHEN NOT MATCHED BY SOURCE THEN UPDATE SET b = 'Not matched by source'
|
||||
@@ -519,7 +519,7 @@ MERGE INTO rw_view2 t
|
||||
WHEN MATCHED THEN UPDATE SET b = s.b
|
||||
WHEN NOT MATCHED AND s.a > 0 THEN INSERT VALUES (s.a, s.b); -- should fail
|
||||
MERGE INTO rw_view2 t
|
||||
- USING (SELECT x, 'R'||x FROM generate_series(0, 95 * 3) x) AS s(a,b) ON t.a = s.a
|
||||
+ USING (SELECT x, 'R'||x FROM generate_series(0,3) x) AS s(a,b) ON t.a = s.a
|
||||
WHEN MATCHED THEN UPDATE SET b = s.b
|
||||
WHEN NOT MATCHED AND s.a > 0 THEN INSERT VALUES (s.a, s.b); -- ok
|
||||
|
||||
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
|
||||
index 6a2f5815ab2..a63cf5cd12c 100644
|
||||
--- a/src/test/regress/sql/vacuum.sql
|
||||
+++ b/src/test/regress/sql/vacuum.sql
|
||||
@@ -156,7 +156,7 @@ CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT);
|
||||
-- Use uncompressed data stored in toast.
|
||||
CREATE INDEX no_index_cleanup_idx ON no_index_cleanup(t);
|
||||
ALTER TABLE no_index_cleanup ALTER COLUMN t SET STORAGE EXTERNAL;
|
||||
-INSERT INTO no_index_cleanup(i, t) VALUES (generate_series(1, 95 * 30),
|
||||
+INSERT INTO no_index_cleanup(i, t) VALUES (generate_series(1,30),
|
||||
repeat('1234567890',269));
|
||||
-- index cleanup option is ignored if VACUUM FULL
|
||||
VACUUM (INDEX_CLEANUP TRUE, FULL TRUE) no_index_cleanup;
|
||||
@@ -462,8 +462,6 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
if var(REAL_S3_ENV).is_ok() {
|
||||
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
assert!(body.contains("process_threads"));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
[package]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
prost.workspace = true
|
||||
tonic.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build.workspace = true
|
||||
@@ -1,13 +0,0 @@
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Generates Rust code from .proto Protobuf schemas, along with a binary file
|
||||
/// descriptor set for Protobuf schema reflection.
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let out_dir = PathBuf::from(env::var("OUT_DIR")?);
|
||||
tonic_build::configure()
|
||||
.bytes(["."])
|
||||
.file_descriptor_set_path(out_dir.join("page_api_descriptor.bin"))
|
||||
.compile_protos(&["proto/page_service.proto"], &["proto"])
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
@@ -1,233 +0,0 @@
|
||||
// Page service, presented by pageservers for computes.
|
||||
//
|
||||
// This is the compute read path. It primarily serves page versions at given
|
||||
// LSNs, but also base backups, SLRU segments, and relation metadata.
|
||||
//
|
||||
// EXPERIMENTAL: this is still under development and subject to change.
|
||||
//
|
||||
// Request metadata headers:
|
||||
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
|
||||
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
|
||||
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
|
||||
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
|
||||
//
|
||||
// The service can be accessed via e.g. grpcurl:
|
||||
//
|
||||
// ```
|
||||
// grpcurl \
|
||||
// -plaintext \
|
||||
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
|
||||
// -H "neon-shard-id: 0b10" \
|
||||
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
|
||||
// -H "authorization: Bearer $JWT" \
|
||||
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
|
||||
// localhost:51051 page_api.PageService/CheckRelExists
|
||||
// ```
|
||||
//
|
||||
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
|
||||
// However, this will require reconnecting when changing modes.
|
||||
//
|
||||
// TODO: write implementation guidance on
|
||||
// - Health checks
|
||||
// - Tracing, OpenTelemetry
|
||||
// - Compression
|
||||
|
||||
syntax = "proto3";
|
||||
package page_api;
|
||||
|
||||
service PageService {
|
||||
// Returns whether a relation exists.
|
||||
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
|
||||
|
||||
// Fetches a base backup.
|
||||
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
|
||||
|
||||
// Returns the total size of a database, as # of bytes.
|
||||
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
|
||||
|
||||
// Fetches pages.
|
||||
//
|
||||
// This is implemented as a bidirectional streaming RPC for performance. Unary
|
||||
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
|
||||
// authentication, and so on -- with streaming, we only pay these costs during
|
||||
// the initial stream setup. This ~doubles throughput in benchmarks. Other
|
||||
// RPCs use regular unary requests, since they are not as frequent and
|
||||
// performance-critical, and this simplifies implementation.
|
||||
//
|
||||
// NB: a status response (e.g. errors) will terminate the stream. The stream
|
||||
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
|
||||
// Most errors are therefore sent as GetPageResponse.status instead.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns the size of a relation, as # of blocks.
|
||||
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
|
||||
|
||||
// Fetches an SLRU segment.
|
||||
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
|
||||
}
|
||||
|
||||
// The LSN a request should read at.
|
||||
message ReadLsn {
|
||||
// The request's read LSN. Required.
|
||||
uint64 request_lsn = 1;
|
||||
// If given, the caller guarantees that the page has not been modified since
|
||||
// this LSN. Must be smaller than or equal to request_lsn. This allows the
|
||||
// Pageserver to serve an old page without waiting for the request LSN to
|
||||
// arrive. Valid for all request types.
|
||||
//
|
||||
// It is undefined behaviour to make a request such that the page was, in
|
||||
// fact, modified between request_lsn and not_modified_since_lsn. The
|
||||
// Pageserver might detect it and return an error, or it might return the old
|
||||
// page version or the new page version. Setting not_modified_since_lsn equal
|
||||
// to request_lsn is always safe, but can lead to unnecessary waiting.
|
||||
uint64 not_modified_since_lsn = 2;
|
||||
}
|
||||
|
||||
// A relation identifier.
|
||||
message RelTag {
|
||||
uint32 spc_oid = 1;
|
||||
uint32 db_oid = 2;
|
||||
uint32 rel_number = 3;
|
||||
uint32 fork_number = 4;
|
||||
}
|
||||
|
||||
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
|
||||
// other shards will error.
|
||||
message CheckRelExistsRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message CheckRelExistsResponse {
|
||||
bool exists = 1;
|
||||
}
|
||||
|
||||
// Requests a base backup at a given LSN.
|
||||
message GetBaseBackupRequest {
|
||||
// The LSN to fetch a base backup at.
|
||||
ReadLsn read_lsn = 1;
|
||||
// If true, logical replication slots will not be created.
|
||||
bool replica = 2;
|
||||
}
|
||||
|
||||
// Base backup response chunk, returned as an ordered stream.
|
||||
message GetBaseBackupResponseChunk {
|
||||
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
|
||||
// gRPC message size limit.
|
||||
bytes chunk = 1;
|
||||
}
|
||||
|
||||
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
message GetDbSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 db_oid = 2;
|
||||
}
|
||||
|
||||
message GetDbSizeResponse {
|
||||
uint64 num_bytes = 1;
|
||||
}
|
||||
|
||||
// Requests one or more pages.
|
||||
message GetPageRequest {
|
||||
// A request ID. Will be included in the response. Should be unique for
|
||||
// in-flight requests on the stream.
|
||||
uint64 request_id = 1;
|
||||
// The request class.
|
||||
GetPageClass request_class = 2;
|
||||
// The LSN to read at.
|
||||
ReadLsn read_lsn = 3;
|
||||
// The relation to read from.
|
||||
RelTag rel = 4;
|
||||
// Page numbers to read. Must belong to the remote shard.
|
||||
//
|
||||
// Multiple pages will be executed as a single batch by the Pageserver,
|
||||
// amortizing layer access costs and parallelizing them. This may increase the
|
||||
// latency of any individual request, but improves the overall latency and
|
||||
// throughput of the batch as a whole.
|
||||
//
|
||||
// TODO: this causes an allocation in the common single-block case. The sender
|
||||
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
|
||||
// into a heap-allocated Vec. Consider optimizing this.
|
||||
//
|
||||
// TODO: we might be able to avoid a sort or something if we mandate that these
|
||||
// are always in order. But we can't currenly rely on this on the server, because
|
||||
// of compatibility with the libpq protocol handler.
|
||||
repeated uint32 block_number = 5;
|
||||
}
|
||||
|
||||
// A GetPageRequest class. Primarily intended for observability, but may also be
|
||||
// used for prioritization in the future.
|
||||
enum GetPageClass {
|
||||
// Unknown class. For forwards compatibility: used when the client sends a
|
||||
// class that the server doesn't know about.
|
||||
GET_PAGE_CLASS_UNKNOWN = 0;
|
||||
// A normal request. This is the default.
|
||||
GET_PAGE_CLASS_NORMAL = 1;
|
||||
// A prefetch request. NB: can only be classified on pg < 18.
|
||||
GET_PAGE_CLASS_PREFETCH = 2;
|
||||
// A background request (e.g. vacuum).
|
||||
GET_PAGE_CLASS_BACKGROUND = 3;
|
||||
}
|
||||
|
||||
// A GetPage response.
|
||||
//
|
||||
// A batch response will contain all of the requested pages. We could eagerly
|
||||
// emit individual pages as soon as they are ready, but on a readv() Postgres
|
||||
// holds buffer pool locks on all pages in the batch and we'll only return once
|
||||
// the entire batch is ready, so no one can make use of the individual pages.
|
||||
message GetPageResponse {
|
||||
// The original request's ID.
|
||||
uint64 request_id = 1;
|
||||
// The response status code.
|
||||
GetPageStatus status = 2;
|
||||
// A string describing the status, if any.
|
||||
string reason = 3;
|
||||
// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
repeated bytes page_image = 4;
|
||||
}
|
||||
|
||||
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
|
||||
// want to send errors as gRPC statuses, since this would terminate the stream.
|
||||
enum GetPageStatus {
|
||||
// Unknown status. For forwards compatibility: used when the server sends a
|
||||
// status code that the client doesn't know about.
|
||||
GET_PAGE_STATUS_UNKNOWN = 0;
|
||||
// The request was successful.
|
||||
GET_PAGE_STATUS_OK = 1;
|
||||
// The page did not exist. The tenant/timeline/shard has already been
|
||||
// validated during stream setup.
|
||||
GET_PAGE_STATUS_NOT_FOUND = 2;
|
||||
// The request was invalid.
|
||||
GET_PAGE_STATUS_INVALID = 3;
|
||||
// The tenant is rate limited. Slow down and retry later.
|
||||
GET_PAGE_STATUS_SLOW_DOWN = 4;
|
||||
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
|
||||
// layer download. This could free up the server task to process other
|
||||
// requests while the layer download is in progress.
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
|
||||
// shard 0, other shards will error.
|
||||
message GetRelSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message GetRelSizeResponse {
|
||||
uint32 num_blocks = 1;
|
||||
}
|
||||
|
||||
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
message GetSlruSegmentRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 kind = 2;
|
||||
uint32 segno = 3;
|
||||
}
|
||||
|
||||
// Returns an SLRU segment.
|
||||
//
|
||||
// These are up 32 pages (256 KB), so we can send them as a single response.
|
||||
message GetSlruSegmentResponse {
|
||||
bytes segment = 1;
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
//! This crate provides the Pageserver's page API. It contains:
|
||||
//!
|
||||
//! * proto/page_service.proto: the Protobuf schema for the page API.
|
||||
//! * proto: auto-generated Protobuf types for gRPC.
|
||||
//!
|
||||
//! This crate is used by both the client and the server. Try to keep it slim.
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
tonic::include_proto!("page_api");
|
||||
|
||||
/// File descriptor set for Protobuf schema reflection. This allows using
|
||||
/// e.g. grpcurl with the API.
|
||||
pub const FILE_DESCRIPTOR_SET: &[u8] =
|
||||
tonic::include_file_descriptor_set!("page_api_descriptor");
|
||||
|
||||
pub use page_service_client::PageServiceClient;
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
@@ -383,19 +383,12 @@ async fn handle_client(
|
||||
info!("performing the proxy pass...");
|
||||
|
||||
let res = match client {
|
||||
Connection::Raw(mut c) => {
|
||||
copy_bidirectional_client_compute(&mut tls_stream, &mut c, |_, _| {}).await
|
||||
}
|
||||
Connection::Tls(mut c) => {
|
||||
copy_bidirectional_client_compute(&mut tls_stream, &mut c, |_, _| {}).await
|
||||
}
|
||||
Connection::Raw(mut c) => copy_bidirectional_client_compute(&mut tls_stream, &mut c).await,
|
||||
Connection::Tls(mut c) => copy_bidirectional_client_compute(&mut tls_stream, &mut c).await,
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(()) => Ok(()),
|
||||
Err(ErrorSource::Timeout(_)) => Err(anyhow!(
|
||||
"timed out while gracefully shutting down the connection"
|
||||
)),
|
||||
Ok(_) => Ok(()),
|
||||
Err(ErrorSource::Client(err)) => Err(err).context("client"),
|
||||
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),
|
||||
}
|
||||
|
||||
@@ -161,11 +161,8 @@ struct ProxyCliArgs {
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
|
||||
redis_rps_limit: Vec<RateBucketInfo>,
|
||||
/// Cancellation channel size (max queue size for redis kv client)
|
||||
#[clap(long, default_value_t = 1024)]
|
||||
#[clap(long, default_value = "1024")]
|
||||
cancellation_ch_size: usize,
|
||||
/// Cancellation ops batch size for redis
|
||||
#[clap(long, default_value_t = 8)]
|
||||
cancellation_batch_size: usize,
|
||||
/// cache for `allowed_ips` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
allowed_ips_cache: String,
|
||||
@@ -545,12 +542,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
if let Some(mut redis_kv_client) = redis_kv_client {
|
||||
maintenance_tasks.spawn(async move {
|
||||
redis_kv_client.try_connect().await?;
|
||||
handle_cancel_messages(
|
||||
&mut redis_kv_client,
|
||||
rx_cancel,
|
||||
args.cancellation_batch_size,
|
||||
)
|
||||
.await?;
|
||||
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
|
||||
|
||||
drop(redis_kv_client);
|
||||
|
||||
|
||||
@@ -30,6 +30,8 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
type IpSubnetKey = IpNet;
|
||||
|
||||
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
|
||||
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
|
||||
const BATCH_SIZE: usize = 8;
|
||||
|
||||
// Message types for sending through mpsc channel
|
||||
pub enum CancelKeyOp {
|
||||
@@ -229,13 +231,12 @@ impl CancelReplyOp {
|
||||
pub async fn handle_cancel_messages(
|
||||
client: &mut RedisKVClient,
|
||||
mut rx: mpsc::Receiver<CancelKeyOp>,
|
||||
batch_size: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut batch = Vec::with_capacity(batch_size);
|
||||
let mut pipeline = Pipeline::with_capacity(batch_size);
|
||||
let mut batch = Vec::with_capacity(BATCH_SIZE);
|
||||
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
|
||||
|
||||
loop {
|
||||
if rx.recv_many(&mut batch, batch_size).await == 0 {
|
||||
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
|
||||
warn!("shutting down cancellation queue");
|
||||
break Ok(());
|
||||
}
|
||||
@@ -366,7 +367,8 @@ impl CancellationHandler {
|
||||
return Err(CancelError::InternalError);
|
||||
};
|
||||
|
||||
tx.try_send(op)
|
||||
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::warn!("failed to send GetCancelData for {key}: {e}");
|
||||
})
|
||||
@@ -568,7 +570,7 @@ impl Session {
|
||||
}
|
||||
|
||||
// Send the store key op to the cancellation handler and set TTL for the key
|
||||
pub(crate) fn write_cancel_key(
|
||||
pub(crate) async fn write_cancel_key(
|
||||
&self,
|
||||
cancel_closure: CancelClosure,
|
||||
) -> Result<(), CancelError> {
|
||||
@@ -594,14 +596,14 @@ impl Session {
|
||||
expire: CANCEL_KEY_TTL,
|
||||
};
|
||||
|
||||
let _ = tx.try_send(op).map_err(|e| {
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
let key = self.key;
|
||||
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
|
||||
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
|
||||
let Some(tx) = &self.cancellation_handler.tx else {
|
||||
tracing::warn!("cancellation handler is not available");
|
||||
return Err(CancelError::InternalError);
|
||||
@@ -617,7 +619,7 @@ impl Session {
|
||||
.guard(RedisMsgKind::HDel),
|
||||
};
|
||||
|
||||
let _ = tx.try_send(op).map_err(|e| {
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
let key = self.key;
|
||||
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
|
||||
});
|
||||
|
||||
@@ -129,12 +129,6 @@ pub async fn task_main(
|
||||
let _disconnect = ctx.log_connect();
|
||||
match p.proxy_pass(&config.connect_to_compute).await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Timeout(_)) => {
|
||||
info!(
|
||||
?session_id,
|
||||
"per-client task timed out while gracefully shutting down the connection"
|
||||
);
|
||||
}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
error!(
|
||||
?session_id,
|
||||
@@ -250,7 +244,9 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session = cancellation_handler_clone.get_key();
|
||||
|
||||
session.write_cancel_key(node.cancel_closure.clone())?;
|
||||
session
|
||||
.write_cancel_key(node.cancel_closure.clone())
|
||||
.await?;
|
||||
|
||||
prepare_client_connection(&node, *session.key(), &mut stream).await?;
|
||||
|
||||
|
||||
@@ -200,10 +200,8 @@ pub enum HttpDirection {
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
#[label(singleton = "direction")]
|
||||
pub enum Direction {
|
||||
#[label(rename = "tx")]
|
||||
ComputeToClient,
|
||||
#[label(rename = "rx")]
|
||||
ClientToCompute,
|
||||
Tx,
|
||||
Rx,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
|
||||
|
||||
@@ -1,394 +1,313 @@
|
||||
use std::future::poll_fn;
|
||||
use std::io;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll, ready};
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tracing::info;
|
||||
|
||||
use crate::metrics::Direction;
|
||||
#[derive(Debug)]
|
||||
enum TransferState {
|
||||
Running(CopyBuffer),
|
||||
ShuttingDown(u64),
|
||||
Done(u64),
|
||||
}
|
||||
|
||||
const DISCONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ErrorDirection {
|
||||
Read(io::Error),
|
||||
Write(io::Error),
|
||||
}
|
||||
|
||||
/// Mark a value as being unlikely.
|
||||
#[cold]
|
||||
#[inline(always)]
|
||||
fn cold<I>(i: I) -> I {
|
||||
i
|
||||
impl ErrorSource {
|
||||
fn from_client(err: ErrorDirection) -> ErrorSource {
|
||||
match err {
|
||||
ErrorDirection::Read(client) => Self::Client(client),
|
||||
ErrorDirection::Write(compute) => Self::Compute(compute),
|
||||
}
|
||||
}
|
||||
fn from_compute(err: ErrorDirection) -> ErrorSource {
|
||||
match err {
|
||||
ErrorDirection::Write(client) => Self::Client(client),
|
||||
ErrorDirection::Read(compute) => Self::Compute(compute),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ErrorSource {
|
||||
Client(io::Error),
|
||||
Compute(io::Error),
|
||||
Timeout(tokio::time::error::Elapsed),
|
||||
}
|
||||
|
||||
impl ErrorSource {
|
||||
fn read(dir: Direction, err: io::Error) -> Self {
|
||||
match dir {
|
||||
Direction::ComputeToClient => ErrorSource::Compute(err),
|
||||
Direction::ClientToCompute => ErrorSource::Client(err),
|
||||
}
|
||||
}
|
||||
|
||||
fn write(dir: Direction, err: io::Error) -> Self {
|
||||
match dir {
|
||||
Direction::ComputeToClient => ErrorSource::Client(err),
|
||||
Direction::ClientToCompute => ErrorSource::Compute(err),
|
||||
fn transfer_one_direction<A, B>(
|
||||
cx: &mut Context<'_>,
|
||||
state: &mut TransferState,
|
||||
r: &mut A,
|
||||
w: &mut B,
|
||||
) -> Poll<Result<u64, ErrorDirection>>
|
||||
where
|
||||
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
|
||||
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
|
||||
{
|
||||
let mut r = Pin::new(r);
|
||||
let mut w = Pin::new(w);
|
||||
loop {
|
||||
match state {
|
||||
TransferState::Running(buf) => {
|
||||
let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
|
||||
*state = TransferState::ShuttingDown(count);
|
||||
}
|
||||
TransferState::ShuttingDown(count) => {
|
||||
ready!(w.as_mut().poll_shutdown(cx)).map_err(ErrorDirection::Write)?;
|
||||
*state = TransferState::Done(*count);
|
||||
}
|
||||
TransferState::Done(count) => return Poll::Ready(Ok(*count)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn copy_bidirectional_client_compute<Client, Compute>(
|
||||
client: &mut Client,
|
||||
compute: &mut Compute,
|
||||
mut f: impl for<'a> FnMut(Direction, &'a [u8]),
|
||||
) -> Result<(), ErrorSource>
|
||||
) -> Result<(u64, u64), ErrorSource>
|
||||
where
|
||||
Client: AsyncRead + AsyncWrite + Unpin + ?Sized,
|
||||
Compute: AsyncRead + AsyncWrite + Unpin + ?Sized,
|
||||
{
|
||||
let f = &mut f;
|
||||
let mut client_to_compute = CopyBuffer::new(Direction::ClientToCompute);
|
||||
let mut compute_to_client = CopyBuffer::new(Direction::ComputeToClient);
|
||||
let mut client_to_compute = TransferState::Running(CopyBuffer::new());
|
||||
let mut compute_to_client = TransferState::Running(CopyBuffer::new());
|
||||
|
||||
let mut client = Pin::new(client);
|
||||
let mut compute = Pin::new(compute);
|
||||
poll_fn(|cx| {
|
||||
let mut client_to_compute_result =
|
||||
transfer_one_direction(cx, &mut client_to_compute, client, compute)
|
||||
.map_err(ErrorSource::from_client)?;
|
||||
let mut compute_to_client_result =
|
||||
transfer_one_direction(cx, &mut compute_to_client, compute, client)
|
||||
.map_err(ErrorSource::from_compute)?;
|
||||
|
||||
// Initial copy hot path
|
||||
let close_dir = poll_fn(|cx| -> Poll<Result<_, ErrorSource>> {
|
||||
let copy1 = client_to_compute.poll_copy(cx, f, client.as_mut(), compute.as_mut())?;
|
||||
let copy2 = compute_to_client.poll_copy(cx, f, compute.as_mut(), client.as_mut())?;
|
||||
// TODO: 1 info log, with a enum label for close direction.
|
||||
|
||||
match (copy1, copy2) {
|
||||
(Poll::Pending, Poll::Pending) => Poll::Pending,
|
||||
(Poll::Ready(()), _) => Poll::Ready(Ok(client_to_compute.dir)),
|
||||
(_, Poll::Ready(())) => Poll::Ready(Ok(compute_to_client.dir)),
|
||||
// Early termination checks from compute to client.
|
||||
if let TransferState::Done(_) = compute_to_client {
|
||||
if let TransferState::Running(buf) = &client_to_compute {
|
||||
info!("Compute is done, terminate client");
|
||||
// Initiate shutdown
|
||||
client_to_compute = TransferState::ShuttingDown(buf.amt);
|
||||
client_to_compute_result =
|
||||
transfer_one_direction(cx, &mut client_to_compute, client, compute)
|
||||
.map_err(ErrorSource::from_client)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Early termination checks from client to compute.
|
||||
if let TransferState::Done(_) = client_to_compute {
|
||||
if let TransferState::Running(buf) = &compute_to_client {
|
||||
info!("Client is done, terminate compute");
|
||||
// Initiate shutdown
|
||||
compute_to_client = TransferState::ShuttingDown(buf.amt);
|
||||
compute_to_client_result =
|
||||
transfer_one_direction(cx, &mut compute_to_client, compute, client)
|
||||
.map_err(ErrorSource::from_compute)?;
|
||||
}
|
||||
}
|
||||
|
||||
// It is not a problem if ready! returns early ... (comment remains the same)
|
||||
let client_to_compute = ready!(client_to_compute_result);
|
||||
let compute_to_client = ready!(compute_to_client_result);
|
||||
|
||||
Poll::Ready(Ok((client_to_compute, compute_to_client)))
|
||||
})
|
||||
.await?;
|
||||
|
||||
// initiate shutdown.
|
||||
match close_dir {
|
||||
Direction::ClientToCompute => {
|
||||
info!("Client is done, terminate compute");
|
||||
|
||||
// we will never write anymore data to the client.
|
||||
compute_to_client.filled = 0..0;
|
||||
|
||||
// make sure to shutdown the client conn.
|
||||
compute_to_client.need_flush = true;
|
||||
}
|
||||
Direction::ComputeToClient => {
|
||||
info!("Compute is done, terminate client");
|
||||
|
||||
// we will never write anymore data to the compute.
|
||||
client_to_compute.filled = 0..0;
|
||||
|
||||
// make sure to shutdown the compute conn.
|
||||
client_to_compute.need_flush = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Finish sending the rest of the data to client/compute before shutting it down.
|
||||
//
|
||||
// Edge case:
|
||||
// * peer has filled the TCP buffers and is blocking on a `write()`,
|
||||
// * proxy has filled the TCP buffers and is waiting on a `write()`.
|
||||
// Since no side is reading from the buffers, no progress will be made.
|
||||
let shutdown = poll_fn(|cx| {
|
||||
let res1 = client_to_compute.poll_empty(cx, compute.as_mut())?;
|
||||
let res2 = compute_to_client.poll_empty(cx, client.as_mut())?;
|
||||
|
||||
if res1.is_ready() && res2.is_ready() {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
});
|
||||
|
||||
// We assume most peers will have enough buffer space so this issue doesn't arise, but we apply
|
||||
// a timeout just in case.
|
||||
//
|
||||
// We could also update `poll_empty` to try and read the data, but I think this is not an edge case
|
||||
// worth overcomplicating.
|
||||
let res = tokio::time::timeout(DISCONNECT_TIMEOUT, shutdown).await;
|
||||
|
||||
match res {
|
||||
Ok(res) => res,
|
||||
Err(timeout) => Err(ErrorSource::Timeout(timeout)),
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
const DEFAULT_BUF_SIZE: usize = 1024;
|
||||
#[derive(Debug)]
|
||||
pub(super) struct CopyBuffer {
|
||||
dir: Direction,
|
||||
read_done: bool,
|
||||
need_flush: bool,
|
||||
filled: Range<usize>,
|
||||
buf: [u8; DEFAULT_BUF_SIZE],
|
||||
pos: usize,
|
||||
cap: usize,
|
||||
amt: u64,
|
||||
buf: Box<[u8]>,
|
||||
}
|
||||
const DEFAULT_BUF_SIZE: usize = 1024;
|
||||
|
||||
impl CopyBuffer {
|
||||
pub(super) const fn new(dir: Direction) -> Self {
|
||||
pub(super) fn new() -> Self {
|
||||
Self {
|
||||
dir,
|
||||
read_done: false,
|
||||
need_flush: false,
|
||||
filled: 0..0,
|
||||
buf: [0; DEFAULT_BUF_SIZE],
|
||||
pos: 0,
|
||||
cap: 0,
|
||||
amt: 0,
|
||||
buf: vec![0; DEFAULT_BUF_SIZE].into_boxed_slice(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns Ready(Ok(())) when no more writes could progress, and the buffer has space to read.
|
||||
#[inline(always)]
|
||||
fn poll_write_loop<W>(
|
||||
fn poll_fill_buf<R>(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
mut writer: Pin<&mut W>,
|
||||
) -> Poll<Result<(), ErrorSource>>
|
||||
where
|
||||
W: AsyncWrite + ?Sized,
|
||||
{
|
||||
debug_assert!(!self.filled.is_empty());
|
||||
|
||||
loop {
|
||||
let filled_buf = &self.buf[self.filled.clone()];
|
||||
match writer.as_mut().poll_write(cx, filled_buf) {
|
||||
Poll::Ready(Err(err)) => {
|
||||
return Poll::Ready(Err(ErrorSource::write(self.dir, cold(err))));
|
||||
}
|
||||
Poll::Ready(Ok(0)) => {
|
||||
let err =
|
||||
io::Error::new(io::ErrorKind::WriteZero, "write zero byte into writer");
|
||||
return Poll::Ready(Err(ErrorSource::write(self.dir, cold(err))));
|
||||
}
|
||||
Poll::Ready(Ok(i)) => {
|
||||
// update the write head.
|
||||
self.filled.start += i;
|
||||
self.need_flush = true;
|
||||
|
||||
// we wrote some data, but the filled buffer might not be fully empty yet.
|
||||
if !self.filled.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// the buffer is definitely empty. reset positions.
|
||||
self.filled = 0..0;
|
||||
break;
|
||||
}
|
||||
// While we couldn't write, we might be able to read.
|
||||
Poll::Pending if self.filled.end < self.buf.len() => break,
|
||||
// We couldn't write, and have no space to read. Just exit.
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
/// Returns Ready(Ok((true))) when read returns EOF.
|
||||
/// Returns Ready(Ok((false))) when read returns data.
|
||||
#[inline(always)]
|
||||
fn poll_read_once<R>(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
f: &mut impl for<'a> FnMut(Direction, &'a [u8]),
|
||||
reader: Pin<&mut R>,
|
||||
) -> Poll<Result<bool, ErrorSource>>
|
||||
) -> Poll<io::Result<()>>
|
||||
where
|
||||
R: AsyncRead + ?Sized,
|
||||
{
|
||||
debug_assert!(self.filled.end < self.buf.len());
|
||||
let mut buf = ReadBuf::new(&mut self.buf[self.filled.end..]);
|
||||
let me = &mut *self;
|
||||
let mut buf = ReadBuf::new(&mut me.buf);
|
||||
buf.set_filled(me.cap);
|
||||
|
||||
match reader.poll_read(cx, &mut buf) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
let filled = buf.filled();
|
||||
// no more data to read, switch to shutdown mode.
|
||||
if filled.is_empty() {
|
||||
self.need_flush = true;
|
||||
return Poll::Ready(Ok(true));
|
||||
}
|
||||
|
||||
// run our inspection callback.
|
||||
f(self.dir, filled);
|
||||
|
||||
// update the read head.
|
||||
self.filled.end += filled.len();
|
||||
|
||||
// read more data
|
||||
Poll::Ready(Ok(false))
|
||||
}
|
||||
// cannot continue on error.
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorSource::read(self.dir, cold(e)))),
|
||||
// No more data to read, and no more data to write.
|
||||
Poll::Pending => Poll::Pending,
|
||||
let res = reader.poll_read(cx, &mut buf);
|
||||
if let Poll::Ready(Ok(())) = res {
|
||||
let filled_len = buf.filled().len();
|
||||
me.read_done = me.cap == filled_len;
|
||||
me.cap = filled_len;
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Returns Ready(Ok(())) when read returns EOF.
|
||||
fn poll_copy<R, W>(
|
||||
fn poll_write_buf<R, W>(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
f: &mut impl for<'a> FnMut(Direction, &'a [u8]),
|
||||
mut reader: Pin<&mut R>,
|
||||
mut writer: Pin<&mut W>,
|
||||
) -> Poll<Result<(), ErrorSource>>
|
||||
) -> Poll<Result<usize, ErrorDirection>>
|
||||
where
|
||||
R: AsyncRead + ?Sized,
|
||||
W: AsyncWrite + ?Sized,
|
||||
{
|
||||
// this register eliminates a branch in the hot loop.
|
||||
let mut empty = self.filled.is_empty();
|
||||
|
||||
// write then read hot loop
|
||||
loop {
|
||||
if !empty {
|
||||
ready!(self.poll_write_loop(cx, writer.as_mut())?);
|
||||
}
|
||||
|
||||
// If empty is true, there is guaranteed space to read.
|
||||
// If empty is false, and the write loop returned ready, then we know there's space for more reads.
|
||||
match self.poll_read_once(cx, f, reader.as_mut())? {
|
||||
// EOF
|
||||
Poll::Ready(true) => return Poll::Ready(Ok(())),
|
||||
// Needs write.
|
||||
Poll::Ready(false) => empty = false,
|
||||
// Cannot read. The peer might not send us anything until
|
||||
// they receive data from us, so let's switch to flushing.
|
||||
Poll::Pending => break,
|
||||
let me = &mut *self;
|
||||
match writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]) {
|
||||
Poll::Pending => {
|
||||
// Top up the buffer towards full if we can read a bit more
|
||||
// data - this should improve the chances of a large write
|
||||
if !me.read_done && me.cap < me.buf.len() {
|
||||
ready!(me.poll_fill_buf(cx, reader.as_mut())).map_err(ErrorDirection::Read)?;
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
res @ Poll::Ready(_) => res.map_err(ErrorDirection::Write),
|
||||
}
|
||||
|
||||
if self.need_flush {
|
||||
let flush = writer.as_mut().poll_flush(cx);
|
||||
ready!(flush.map_err(|e| ErrorSource::write(self.dir, e))?);
|
||||
self.need_flush = false;
|
||||
}
|
||||
|
||||
// there might be more data still to read.
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Returns Ready(Ok(())) when the conn is fully shutdown.
|
||||
pub(super) fn poll_empty<W>(
|
||||
pub(super) fn poll_copy<R, W>(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
mut reader: Pin<&mut R>,
|
||||
mut writer: Pin<&mut W>,
|
||||
) -> Poll<Result<(), ErrorSource>>
|
||||
) -> Poll<Result<u64, ErrorDirection>>
|
||||
where
|
||||
R: AsyncRead + ?Sized,
|
||||
W: AsyncWrite + ?Sized,
|
||||
{
|
||||
if !self.filled.is_empty() {
|
||||
ready!(self.poll_write_loop(cx, writer.as_mut())?);
|
||||
loop {
|
||||
// If there is some space left in our buffer, then we try to read some
|
||||
// data to continue, thus maximizing the chances of a large write.
|
||||
if self.cap < self.buf.len() && !self.read_done {
|
||||
match self.poll_fill_buf(cx, reader.as_mut()) {
|
||||
Poll::Ready(Ok(())) => (),
|
||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(ErrorDirection::Read(err))),
|
||||
Poll::Pending => {
|
||||
// Ignore pending reads when our buffer is not empty, because
|
||||
// we can try to write data immediately.
|
||||
if self.pos == self.cap {
|
||||
// Try flushing when the reader has no progress to avoid deadlock
|
||||
// when the reader depends on buffered writer.
|
||||
if self.need_flush {
|
||||
ready!(writer.as_mut().poll_flush(cx))
|
||||
.map_err(ErrorDirection::Write)?;
|
||||
self.need_flush = false;
|
||||
}
|
||||
|
||||
if !self.filled.is_empty() {
|
||||
// still some data to write
|
||||
return Poll::Pending;
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If our buffer has some data, let's write it out!
|
||||
while self.pos < self.cap {
|
||||
let i = ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?;
|
||||
if i == 0 {
|
||||
return Poll::Ready(Err(ErrorDirection::Write(io::Error::new(
|
||||
io::ErrorKind::WriteZero,
|
||||
"write zero byte into writer",
|
||||
))));
|
||||
}
|
||||
self.pos += i;
|
||||
self.amt += i as u64;
|
||||
self.need_flush = true;
|
||||
}
|
||||
|
||||
// If pos larger than cap, this loop will never stop.
|
||||
// In particular, user's wrong poll_write implementation returning
|
||||
// incorrect written length may lead to thread blocking.
|
||||
debug_assert!(
|
||||
self.pos <= self.cap,
|
||||
"writer returned length larger than input slice"
|
||||
);
|
||||
|
||||
// All data has been written, the buffer can be considered empty again
|
||||
self.pos = 0;
|
||||
self.cap = 0;
|
||||
|
||||
// If we've written all the data and we've seen EOF, flush out the
|
||||
// data and finish the transfer.
|
||||
if self.read_done {
|
||||
ready!(writer.as_mut().poll_flush(cx)).map_err(ErrorDirection::Write)?;
|
||||
return Poll::Ready(Ok(self.amt));
|
||||
}
|
||||
}
|
||||
|
||||
if self.need_flush {
|
||||
let res = writer.poll_shutdown(cx);
|
||||
ready!(res.map_err(|e| ErrorSource::write(self.dir, e))?);
|
||||
self.need_flush = false;
|
||||
}
|
||||
|
||||
// no data to read, no data to write.
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_client_to_compute() {
|
||||
let (mut client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
|
||||
let (mut proxy_compute, mut compute) = tokio::io::duplex(16); // Create a mock duplex stream
|
||||
let (mut client_client, mut client_proxy) = tokio::io::duplex(8); // Create a mock duplex stream
|
||||
let (mut compute_proxy, mut compute_client) = tokio::io::duplex(32); // Create a mock duplex stream
|
||||
|
||||
client.write_all(b"Neon Serverless Postgres").await.unwrap();
|
||||
compute.write_all(b"is amazing").await.unwrap();
|
||||
// Simulate 'a' finishing while there's still data for 'b'
|
||||
client_client.write_all(b"hello").await.unwrap();
|
||||
client_client.shutdown().await.unwrap();
|
||||
compute_client.write_all(b"Neon").await.unwrap();
|
||||
compute_client.shutdown().await.unwrap();
|
||||
|
||||
client.shutdown().await.unwrap();
|
||||
|
||||
let copy = tokio::spawn(async move {
|
||||
copy_bidirectional_client_compute(&mut client_proxy, &mut proxy_compute, |_, _| {})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Assert correct transferred amounts
|
||||
let mut client_recv = String::new();
|
||||
let mut compute_recv = String::new();
|
||||
|
||||
client.read_to_string(&mut client_recv).await.unwrap();
|
||||
compute.read_to_string(&mut compute_recv).await.unwrap();
|
||||
|
||||
assert_eq!(compute_recv, "Neon Serverless Postgres");
|
||||
assert_eq!(client_recv, "is amazing");
|
||||
|
||||
copy.await.unwrap();
|
||||
let (client_to_compute_count, compute_to_client_count) = result;
|
||||
assert_eq!(client_to_compute_count, 5); // 'hello' was transferred
|
||||
assert_eq!(compute_to_client_count, 4); // response only partially transferred or not at all
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compute_to_client() {
|
||||
let (mut client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
|
||||
let (mut proxy_compute, mut compute) = tokio::io::duplex(16); // Create a mock duplex stream
|
||||
let (mut client_client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
|
||||
let (mut compute_proxy, mut compute_client) = tokio::io::duplex(8); // Create a mock duplex stream
|
||||
|
||||
client.write_all(b"Neon Serverless Postgres").await.unwrap();
|
||||
compute.write_all(b"is amazing").await.unwrap();
|
||||
// Simulate 'a' finishing while there's still data for 'b'
|
||||
compute_client.write_all(b"hello").await.unwrap();
|
||||
compute_client.shutdown().await.unwrap();
|
||||
client_client
|
||||
.write_all(b"Neon Serverless Postgres")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
compute.shutdown().await.unwrap();
|
||||
|
||||
let copy = tokio::spawn(async move {
|
||||
copy_bidirectional_client_compute(&mut client_proxy, &mut proxy_compute, |_, _| {})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Assert correct transferred amounts
|
||||
let mut client_recv = String::new();
|
||||
let mut compute_recv = String::new();
|
||||
|
||||
client.read_to_string(&mut client_recv).await.unwrap();
|
||||
compute.read_to_string(&mut compute_recv).await.unwrap();
|
||||
|
||||
assert_eq!(compute_recv, "Neon Serverless ");
|
||||
assert_eq!(client_recv, "is amazing");
|
||||
|
||||
copy.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_timeout() {
|
||||
let (mut client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
|
||||
let (mut proxy_compute, mut compute) = tokio::io::duplex(16); // Create a mock duplex stream
|
||||
|
||||
// Try to send 24 bytes to compute, but compute only has space for 16 bytes.
|
||||
// Writes will not succeed.
|
||||
client.write_all(b"Neon Serverless Postgres").await.unwrap();
|
||||
client.shutdown().await.unwrap();
|
||||
|
||||
let copy = tokio::spawn(async move {
|
||||
copy_bidirectional_client_compute(&mut client_proxy, &mut proxy_compute, |_, _| {})
|
||||
.await
|
||||
.unwrap_err()
|
||||
});
|
||||
|
||||
tokio::time::advance(DISCONNECT_TIMEOUT).await;
|
||||
|
||||
let res = copy.await.unwrap();
|
||||
assert!(matches!(res, ErrorSource::Timeout(_)));
|
||||
|
||||
// Assert correct transferred amounts
|
||||
let mut compute_recv = String::new();
|
||||
compute.read_to_string(&mut compute_recv).await.unwrap();
|
||||
assert_eq!(compute_recv, "Neon Serverless ");
|
||||
let (client_to_compute_count, compute_to_client_count) = result;
|
||||
assert_eq!(compute_to_client_count, 5); // 'hello' was transferred
|
||||
assert!(client_to_compute_count <= 8); // response only partially transferred or not at all
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,12 +167,6 @@ pub async fn task_main(
|
||||
let _disconnect = ctx.log_connect();
|
||||
match p.proxy_pass(&config.connect_to_compute).await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Timeout(_)) => {
|
||||
info!(
|
||||
?session_id,
|
||||
"per-client task timed out while gracefully shutting down the connection"
|
||||
);
|
||||
}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
warn!(
|
||||
?session_id,
|
||||
@@ -389,7 +383,9 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session = cancellation_handler_clone.get_key();
|
||||
|
||||
session.write_cancel_key(node.cancel_closure.clone())?;
|
||||
session
|
||||
.write_cancel_key(node.cancel_closure.clone())
|
||||
.await?;
|
||||
|
||||
prepare_client_connection(&node, *session.key(), &mut stream).await?;
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use smol_str::SmolStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::debug;
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
use super::copy_bidirectional::ErrorSource;
|
||||
use crate::cancellation;
|
||||
@@ -8,15 +9,14 @@ use crate::compute::PostgresConnection;
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
|
||||
use crate::proxy::copy_bidirectional_client_compute;
|
||||
use crate::stream::Stream;
|
||||
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
|
||||
|
||||
/// Forward bytes in both directions (client <-> compute).
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn proxy_pass(
|
||||
mut client: Stream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
mut compute: impl AsyncRead + AsyncWrite + Unpin,
|
||||
client: impl AsyncRead + AsyncWrite + Unpin,
|
||||
compute: impl AsyncRead + AsyncWrite + Unpin,
|
||||
aux: MetricsAuxInfo,
|
||||
private_link_id: Option<SmolStr>,
|
||||
) -> Result<(), ErrorSource> {
|
||||
@@ -28,30 +28,37 @@ pub(crate) async fn proxy_pass(
|
||||
});
|
||||
|
||||
let metrics = &Metrics::get().proxy.io_bytes;
|
||||
let m_sent = metrics.with_labels(Direction::ComputeToClient);
|
||||
let m_recv = metrics.with_labels(Direction::ClientToCompute);
|
||||
let m_sent = metrics.with_labels(Direction::Tx);
|
||||
let mut client = MeasuredStream::new(
|
||||
client,
|
||||
|_| {},
|
||||
|cnt| {
|
||||
// Number of bytes we sent to the client (outbound).
|
||||
metrics.get_metric(m_sent).inc_by(cnt as u64);
|
||||
usage_tx.record_egress(cnt as u64);
|
||||
},
|
||||
);
|
||||
|
||||
let inspect = |direction, bytes: &[u8]| match direction {
|
||||
Direction::ComputeToClient => {
|
||||
metrics.get_metric(m_sent).inc_by(bytes.len() as u64);
|
||||
usage_tx.record_egress(bytes.len() as u64);
|
||||
}
|
||||
Direction::ClientToCompute => {
|
||||
metrics.get_metric(m_recv).inc_by(bytes.len() as u64);
|
||||
usage_tx.record_ingress(bytes.len() as u64);
|
||||
}
|
||||
};
|
||||
let m_recv = metrics.with_labels(Direction::Rx);
|
||||
let mut compute = MeasuredStream::new(
|
||||
compute,
|
||||
|_| {},
|
||||
|cnt| {
|
||||
// Number of bytes the client sent to the compute node (inbound).
|
||||
metrics.get_metric(m_recv).inc_by(cnt as u64);
|
||||
usage_tx.record_ingress(cnt as u64);
|
||||
},
|
||||
);
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
debug!("performing the proxy pass...");
|
||||
let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute(
|
||||
&mut client,
|
||||
&mut compute,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// reduce branching internal to the hot path.
|
||||
match &mut client {
|
||||
Stream::Raw { raw } => copy_bidirectional_client_compute(raw, &mut compute, inspect).await,
|
||||
Stream::Tls { tls, .. } => {
|
||||
copy_bidirectional_client_compute(&mut *tls, &mut compute, inspect).await
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) struct ProxyPassthrough<S> {
|
||||
@@ -87,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
|
||||
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
|
||||
}
|
||||
|
||||
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
|
||||
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll, ready};
|
||||
|
||||
use anyhow::{Context as _, anyhow};
|
||||
use anyhow::Context as _;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use framed_websockets::{Frame, OpCode, WebSocketServer};
|
||||
use futures::{Sink, Stream};
|
||||
@@ -169,9 +169,6 @@ pub(crate) async fn serve_websocket(
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass(&config.connect_to_compute).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(ErrorSource::Timeout(_)) => Err(anyhow!(
|
||||
"timed out while gracefully shutting down the connection"
|
||||
)),
|
||||
Err(ErrorSource::Client(err)) => Err(err).context("client"),
|
||||
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ if TYPE_CHECKING:
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
@pytest.mark.timeout(7200)
|
||||
@pytest.mark.timeout(4*3600)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_cloud_regress(
|
||||
remote_pg: RemotePostgres,
|
||||
|
||||
Reference in New Issue
Block a user