mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 23:50:39 +00:00
Compare commits
2 Commits
amasteerov
...
devin/1747
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
759962c5cc | ||
|
|
cdb6479c8a |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -4434,6 +4434,16 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"prost 0.13.3",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.2.1"
|
||||
|
||||
@@ -9,6 +9,7 @@ members = [
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/pagebench",
|
||||
"pageserver/page_api",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"safekeeper/client",
|
||||
@@ -252,6 +253,7 @@ pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
|
||||
pageserver_page_api = { path = "./pageserver/page_api" }
|
||||
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
|
||||
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
|
||||
13
pageserver/page_api/Cargo.toml
Normal file
13
pageserver/page_api/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
prost.workspace = true
|
||||
tonic.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build.workspace = true
|
||||
7
pageserver/page_api/build.rs
Normal file
7
pageserver/page_api/build.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Generates Rust code from .proto Protobuf schemas.
|
||||
tonic_build::configure()
|
||||
.bytes(["."])
|
||||
.compile_protos(&["proto/page_service.proto"], &["proto"])
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
220
pageserver/page_api/proto/page_service.proto
Normal file
220
pageserver/page_api/proto/page_service.proto
Normal file
@@ -0,0 +1,220 @@
|
||||
// Page service, presented by pageservers for computes.
|
||||
//
|
||||
// This is the compute read path. It primarily serves page versions at given
|
||||
// LSNs, but also base backups, SLRU segments, and relation metadata.
|
||||
//
|
||||
// EXPERIMENTAL: this is still under development and subject to change.
|
||||
//
|
||||
// Request metadata headers:
|
||||
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
|
||||
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
|
||||
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
|
||||
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
|
||||
//
|
||||
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
|
||||
// However, this will require reconnecting when changing modes.
|
||||
//
|
||||
// TODO: write implementation guidance on
|
||||
// - Health checks
|
||||
// - Tracing, OpenTelemetry
|
||||
// - Compression
|
||||
|
||||
syntax = "proto3";
|
||||
package page_service;
|
||||
|
||||
service PageService {
|
||||
// Returns whether a relation exists.
|
||||
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
|
||||
|
||||
// Fetches a base backup.
|
||||
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
|
||||
|
||||
// Returns the total size of a database, as # of bytes.
|
||||
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
|
||||
|
||||
// Fetches pages.
|
||||
//
|
||||
// This is implemented as a bidirectional streaming RPC for performance. Unary
|
||||
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
|
||||
// authentication, and so on -- with streaming, we only pay these costs during
|
||||
// the initial stream setup. This ~doubles throughput in benchmarks. Other
|
||||
// RPCs use regular unary requests, since they are not as frequent and
|
||||
// performance-critical, and this simplifies implementation.
|
||||
//
|
||||
// NB: a status response (e.g. errors) will terminate the stream. The stream
|
||||
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
|
||||
// Most errors are therefore sent as GetPageResponse.status instead.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns the size of a relation, as # of blocks.
|
||||
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
|
||||
|
||||
// Fetches an SLRU segment.
|
||||
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
|
||||
}
|
||||
|
||||
// The LSN a request should read at.
|
||||
message ReadLsn {
|
||||
// The request's read LSN. Required.
|
||||
uint64 request_lsn = 1;
|
||||
// If given, the caller guarantees that the page has not been modified since
|
||||
// this LSN. Must be smaller than or equal to request_lsn. This allows the
|
||||
// Pageserver to serve an old page without waiting for the request LSN to
|
||||
// arrive. Valid for all request types.
|
||||
//
|
||||
// It is undefined behaviour to make a request such that the page was, in
|
||||
// fact, modified between request_lsn and not_modified_since_lsn. The
|
||||
// Pageserver might detect it and return an error, or it might return the old
|
||||
// page version or the new page version. Setting not_modified_since_lsn equal
|
||||
// to request_lsn is always safe, but can lead to unnecessary waiting.
|
||||
uint64 not_modified_since_lsn = 2;
|
||||
}
|
||||
|
||||
// A relation identifier.
|
||||
message RelTag {
|
||||
uint32 spc_oid = 1;
|
||||
uint32 db_oid = 2;
|
||||
uint32 rel_number = 3;
|
||||
uint32 fork_number = 4;
|
||||
}
|
||||
|
||||
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
|
||||
// other shards will error.
|
||||
message CheckRelExistsRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message CheckRelExistsResponse {
|
||||
bool exists = 1;
|
||||
}
|
||||
|
||||
// Requests a base backup at a given LSN.
|
||||
message GetBaseBackupRequest {
|
||||
// The LSN to fetch a base backup at.
|
||||
ReadLsn read_lsn = 1;
|
||||
// If true, logical replication slots will not be created.
|
||||
bool replica = 2;
|
||||
}
|
||||
|
||||
// Base backup response chunk, returned as an ordered stream.
|
||||
message GetBaseBackupResponseChunk {
|
||||
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
|
||||
// gRPC message size limit.
|
||||
bytes chunk = 1;
|
||||
}
|
||||
|
||||
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
message GetDbSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 db_oid = 2;
|
||||
}
|
||||
|
||||
message GetDbSizeResponse {
|
||||
uint64 num_bytes = 1;
|
||||
}
|
||||
|
||||
// Requests one or more pages.
|
||||
message GetPageRequest {
|
||||
// A request ID. Will be included in the response. Should be unique for
|
||||
// in-flight requests on the stream.
|
||||
uint64 request_id = 1;
|
||||
// The request class.
|
||||
GetPageClass request_class = 2;
|
||||
// The LSN to read at.
|
||||
ReadLsn read_lsn = 3;
|
||||
// The relation to read from.
|
||||
RelTag rel = 4;
|
||||
// Page numbers to read. Must belong to the remote shard.
|
||||
//
|
||||
// Multiple pages will be executed as a single batch by the Pageserver,
|
||||
// amortizing layer access costs and parallelizing them. This may increase the
|
||||
// latency of any individual request, but improves the overall latency and
|
||||
// throughput of the batch as a whole.
|
||||
//
|
||||
// TODO: this causes an allocation in the common single-block case. The sender
|
||||
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
|
||||
// into a heap-allocated Vec. Consider optimizing this.
|
||||
//
|
||||
// TODO: we might be able to avoid a sort or something if we mandate that these
|
||||
// are always in order. But we can't currenly rely on this on the server, because
|
||||
// of compatibility with the libpq protocol handler.
|
||||
repeated uint32 block_number = 5;
|
||||
}
|
||||
|
||||
// A GetPageRequest class. Primarily intended for observability, but may also be
|
||||
// used for prioritization in the future.
|
||||
enum GetPageClass {
|
||||
// Unknown class. For forwards compatibility: used when the client sends a
|
||||
// class that the server doesn't know about.
|
||||
GET_PAGE_CLASS_UNKNOWN = 0;
|
||||
// A normal request. This is the default.
|
||||
GET_PAGE_CLASS_NORMAL = 1;
|
||||
// A prefetch request. NB: can only be classified on pg < 18.
|
||||
GET_PAGE_CLASS_PREFETCH = 2;
|
||||
// A background request (e.g. vacuum).
|
||||
GET_PAGE_CLASS_BACKGROUND = 3;
|
||||
}
|
||||
|
||||
// A GetPage response.
|
||||
//
|
||||
// A batch response will contain all of the requested pages. We could eagerly
|
||||
// emit individual pages as soon as they are ready, but on a readv() Postgres
|
||||
// holds buffer pool locks on all pages in the batch and we'll only return once
|
||||
// the entire batch is ready, so no one can make use of the individual pages.
|
||||
message GetPageResponse {
|
||||
// The original request's ID.
|
||||
uint64 request_id = 1;
|
||||
// The response status code.
|
||||
GetPageStatus status = 2;
|
||||
// A string describing the status, if any.
|
||||
string reason = 3;
|
||||
// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
repeated bytes page_image = 4;
|
||||
}
|
||||
|
||||
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
|
||||
// want to send errors as gRPC statuses, since this would terminate the stream.
|
||||
enum GetPageStatus {
|
||||
// Unknown status. For forwards compatibility: used when the server sends a
|
||||
// status code that the client doesn't know about.
|
||||
GET_PAGE_STATUS_UNKNOWN = 0;
|
||||
// The request was successful.
|
||||
GET_PAGE_STATUS_OK = 1;
|
||||
// The page did not exist. The tenant/timeline/shard has already been
|
||||
// validated during stream setup.
|
||||
GET_PAGE_STATUS_NOT_FOUND = 2;
|
||||
// The request was invalid.
|
||||
GET_PAGE_STATUS_INVALID = 3;
|
||||
// The tenant is rate limited. Slow down and retry later.
|
||||
GET_PAGE_STATUS_SLOW_DOWN = 4;
|
||||
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
|
||||
// layer download. This could free up the server task to process other
|
||||
// requests while the layer download is in progress.
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
|
||||
// shard 0, other shards will error.
|
||||
message GetRelSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message GetRelSizeResponse {
|
||||
uint32 num_blocks = 1;
|
||||
}
|
||||
|
||||
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
message GetSlruSegmentRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 kind = 2;
|
||||
uint32 segno = 3;
|
||||
}
|
||||
|
||||
// Returns an SLRU segment.
|
||||
//
|
||||
// These are up 32 pages (256 KB), so we can send them as a single response.
|
||||
message GetSlruSegmentResponse {
|
||||
bytes segment = 1;
|
||||
}
|
||||
14
pageserver/page_api/src/lib.rs
Normal file
14
pageserver/page_api/src/lib.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
//! This crate provides the Pageserver's page API. It contains:
|
||||
//!
|
||||
//! * proto/page_service.proto: the Protobuf schema for the page API.
|
||||
//! * proto: auto-generated Protobuf types for gRPC.
|
||||
//!
|
||||
//! This crate is used by both the client and the server. Try to keep it slim.
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
tonic::include_proto!("page_service");
|
||||
|
||||
pub use page_service_client::PageServiceClient;
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
@@ -70,9 +70,9 @@ def test_compare_child_and_root_write_perf(neon_compare: NeonCompare):
|
||||
endpoint_child = env.endpoints.create_start("child")
|
||||
|
||||
with neon_compare.record_duration("root_run_duration"):
|
||||
endpoint_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
|
||||
endpoint_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,10000000)") # 10x increase from 1000000
|
||||
with neon_compare.record_duration("child_run_duration"):
|
||||
endpoint_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
|
||||
endpoint_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,10000000)") # 10x increase from 1000000
|
||||
|
||||
|
||||
def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
|
||||
@@ -83,7 +83,7 @@ def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
|
||||
endpoint_root.safe_psql_many(
|
||||
[
|
||||
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
|
||||
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
|
||||
"INSERT INTO foo SELECT FROM generate_series(1,10000000)", # 10x increase from 1000000
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ def test_copy(neon_with_baseline: PgCompare):
|
||||
# Since there's no data in the table previously, this extends it.
|
||||
with env.record_pageserver_writes("copy_extend_pageserver_writes"):
|
||||
with env.record_duration("copy_extend"):
|
||||
cur.copy_from(copy_test_data(1000000), "copytest")
|
||||
cur.copy_from(copy_test_data(10000000), "copytest") # 10x increase from 1000000
|
||||
env.flush()
|
||||
|
||||
# Delete most rows, and VACUUM to make the space available for reuse.
|
||||
@@ -79,7 +79,7 @@ def test_copy(neon_with_baseline: PgCompare):
|
||||
# This will also clear all the VM bits.
|
||||
with env.record_pageserver_writes("copy_reuse_pageserver_writes"):
|
||||
with env.record_duration("copy_reuse"):
|
||||
cur.copy_from(copy_test_data(1000000), "copytest")
|
||||
cur.copy_from(copy_test_data(10000000), "copytest") # 10x increase from 1000000
|
||||
env.flush()
|
||||
|
||||
env.report_peak_memory_use()
|
||||
|
||||
@@ -101,8 +101,8 @@ def test_cumulative_statistics_persistence(
|
||||
try:
|
||||
connstr = project["connection_uris"][0]["connection_uri"]
|
||||
env = connection_parameters_to_env(project["connection_uris"][0]["connection_parameters"])
|
||||
# seed about 1 GiB of data into pgbench_accounts
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s68"], env=env)
|
||||
# seed about 10 GiB of data into pgbench_accounts
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s680"], env=env) # 10x increase from 68
|
||||
|
||||
# assert rows in pgbench_accounts is 6800000 rows
|
||||
conn = psycopg2.connect(connstr)
|
||||
|
||||
@@ -13,7 +13,7 @@ if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import PgProtocol
|
||||
|
||||
|
||||
def start_write_workload(pg: PgProtocol, scale: int = 10):
|
||||
def start_write_workload(pg: PgProtocol, scale: int = 100): # 10x increase from 10
|
||||
with pg.connect().cursor() as cur:
|
||||
cur.execute(f"create table big as select generate_series(1,{scale * 100_000})")
|
||||
|
||||
|
||||
@@ -17,13 +17,13 @@ async def repeat_bytes(buf, repetitions: int):
|
||||
|
||||
async def copy_test_data_to_table(endpoint: Endpoint, worker_id: int, table_name: str):
|
||||
buf = BytesIO()
|
||||
for i in range(1000):
|
||||
for i in range(10000): # 10x increase from 1000
|
||||
buf.write(
|
||||
f"{i}\tLoaded by worker {worker_id}. Long string to consume some space.\n".encode()
|
||||
)
|
||||
buf.seek(0)
|
||||
|
||||
copy_input = repeat_bytes(buf.read(), 5000)
|
||||
copy_input = repeat_bytes(buf.read(), 50000) # 10x increase from 5000
|
||||
|
||||
pg_conn = await endpoint.connect_async()
|
||||
|
||||
|
||||
@@ -16,13 +16,13 @@ async def repeat_bytes(buf, repetitions: int):
|
||||
|
||||
async def copy_test_data_to_table(pg: PgProtocol, worker_id: int, table_name: str):
|
||||
buf = BytesIO()
|
||||
for i in range(1000):
|
||||
for i in range(10000): # 10x increase from 1000
|
||||
buf.write(
|
||||
f"{i}\tLoaded by worker {worker_id}. Long string to consume some space.\n".encode()
|
||||
)
|
||||
buf.seek(0)
|
||||
|
||||
copy_input = repeat_bytes(buf.read(), 5000)
|
||||
copy_input = repeat_bytes(buf.read(), 50000) # 10x increase from 5000
|
||||
|
||||
pg_conn = await pg.connect_async()
|
||||
await pg_conn.copy_to_table(table_name, source=copy_input)
|
||||
|
||||
@@ -181,7 +181,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
env.report_size()
|
||||
|
||||
|
||||
def get_durations_matrix(default: int = 45) -> list[int]:
|
||||
def get_durations_matrix(default: int = 450) -> list[int]: # 10x increase from 45
|
||||
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default=str(default))
|
||||
rv = []
|
||||
for d in durations.split(","):
|
||||
@@ -197,7 +197,7 @@ def get_durations_matrix(default: int = 45) -> list[int]:
|
||||
return rv
|
||||
|
||||
|
||||
def get_scales_matrix(default: int = 10) -> list[int]:
|
||||
def get_scales_matrix(default: int = 100) -> list[int]: # 10x increase from 10
|
||||
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
|
||||
rv = []
|
||||
for s in scales.split(","):
|
||||
|
||||
@@ -18,13 +18,13 @@ if TYPE_CHECKING:
|
||||
@pytest.mark.parametrize(
|
||||
"rows,iters,workers",
|
||||
[
|
||||
# The test table is large enough (3-4 MB) that it doesn't fit in the compute node
|
||||
# The test table is large enough (30-40 MB) that it doesn't fit in the compute node
|
||||
# cache, so the seqscans go to the page server. But small enough that it fits
|
||||
# into memory in the page server.
|
||||
pytest.param(100000, 100, 0),
|
||||
pytest.param(1000000, 100, 0), # 10x increase from 100000
|
||||
# Also test with a larger table, with and without parallelism
|
||||
pytest.param(10000000, 1, 0),
|
||||
pytest.param(10000000, 1, 4),
|
||||
pytest.param(100000000, 1, 0), # 10x increase from 10000000
|
||||
pytest.param(100000000, 1, 4), # 10x increase from 10000000
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
@@ -69,9 +69,9 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
|
||||
|
||||
## Single table workload:
|
||||
At each step, insert new `new_rows_each_update` rows.
|
||||
The variable `new_rows_each_update` is equal to `scale * 100_000`.
|
||||
The variable `new_rows_each_update` is equal to `scale * 1_000_000`.
|
||||
The number of steps is determined by `num_iters` variable."""
|
||||
new_rows_each_update = scale * 100_000
|
||||
new_rows_each_update = scale * 1_000_000 # 10x increase from 100_000
|
||||
|
||||
def start_single_table_workload(table_id: int):
|
||||
for _ in range(num_iters):
|
||||
|
||||
Reference in New Issue
Block a user