Compare commits

..

28 Commits

Author SHA1 Message Date
Elizabeth Murray
04440343f8 Pagebench with grpc option. Note that grpc is on port 51050, so requires a connstring to be set. 2025-05-28 14:44:28 -07:00
Elizabeth Murray
578b7f1668 Remove "pub" for module module in pageserver_page_api. 2025-05-28 13:10:09 -07:00
Elizabeth Murray
97f18dd013 Remove unnecessary whitespace. 2025-05-28 12:54:53 -07:00
Elizabeth Murray
c8abe7e90f Remove unnecessary model changes. 2025-05-28 12:53:28 -07:00
Elizabeth Murray
7160fd16cd Response to review comments, code cleanup. 2025-05-28 12:40:21 -07:00
Elizabeth Murray
13b9d4cb67 Merge branch 'main' into elizabeth/communicator-grpc-minimal-domain-client 2025-05-28 09:40:47 -07:00
Elizabeth Murray
f0982f9a0a Clean up dependencies. 2025-05-28 08:51:01 -07:00
Elizabeth Murray
1634af6d10 Move conversion from string out of the auth interceptor. 2025-05-28 08:45:20 -07:00
Vlad Lazar
eadabeddb8 pageserver: use the same job size throughout the import lifetime (#12026)
## Problem

Import planning takes a job size limit as its input. Previously, the job
size came from a pageserver config field. This field may change while
imports are in progress. If this happens, plans will no longer be
identical and the import would fail permanently.

## Summary of Changes

Bake the job size into the import progress reported to the storage
controller. For new imports, use the value from the pagesever config,
and, for existing imports, use the value present in the shard progress.

This value is identical for all shards, but we want it to be versioned
since future versions of the planner might split the jobs up
differently. Hence, it ends up in `ShardImportProgress`.

Closes https://github.com/neondatabase/neon/issues/11983
2025-05-28 15:19:41 +00:00
Elizabeth Murray
53c1a7ca7f Add minimal GRPC client code that will be used for pagebench. 2025-05-28 08:09:45 -07:00
Alex Chi Z.
67ddf1de28 feat(pageserver): create image layers at L0-L1 boundary (#12023)
## Problem

Previous attempt https://github.com/neondatabase/neon/pull/10548 caused
some issues in staging and we reverted it. This is a re-attempt to
address https://github.com/neondatabase/neon/issues/11063.

Currently we create image layers at latest record LSN. We would create
"future image layers" (i.e., image layers with LSN larger than disk
consistent LSN) that need special handling at startup. We also waste a
lot of read operations to reconstruct from L0 layers while we could have
compacted all of the L0 layers and operate on a flat level of historic
layers.

## Summary of changes

* Run repartition at L0-L1 boundary.
* Roll out with feature flags.
* Piggyback a change that downgrades "image layer creating below
gc_cutoff" to debug level.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-28 07:00:52 +00:00
Nikita Kalyanov
541fcd8d2f chore: expose new mark_invisible API in openAPI spec for use in cplane (#12032)
## Problem
There is a new API that I plan to use. We generate client from the spec
so it should be in the spec
## Summary of changes
Document the existing API in openAPI format
2025-05-28 03:39:59 +00:00
Suhas Thalanki
e77961c1c6 background worker that collects installed extensions (#11939)
## Problem

Currently, we collect metrics of what extensions are installed on
computes at start up time. We do not have a mechanism that does this at
runtime.

## Summary of changes

Added a background thread that queries all DBs at regular intervals and
collects a list of installed extensions.
2025-05-27 19:40:51 +00:00
Tristan Partin
cdfa06caad Remove test-images compatibility hack for confirming library load paths (#11927)
This hack was needed for compatiblity tests, but after the compute
release is no longer needed.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-27 17:33:16 +00:00
Alex Chi Z.
f0bb93a9c9 feat(pageserver): support evaluate boolean flags (#12024)
## Problem

Part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

* Support evaluate boolean flags.
* Add docs on how to handle errors.
* Add test cases based on real PostHog config.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-27 14:29:15 +00:00
Vlad Lazar
30adf8e2bd pageserver: add tracing spans for time spent in batch and flushing (#12012)
## Problem

We have some gaps in our traces. This indicates missing spans.

## Summary of changes

This PR adds two new spans:
* WAIT_EXECUTOR: time a batched request spends in the batch waiting to
be picked up
* FLUSH_RESPONSE: time a get page request spends flushing the response
to the compute


![image](https://github.com/user-attachments/assets/41b3ddb8-438d-4375-9da3-da341fc0916a)
2025-05-27 13:57:53 +00:00
Erik Grinaker
5d538a9503 page_api: tweak errors (#12019)
## Problem

The page API gRPC errors need a few tweaks to integrate better with the
GetPage machinery.

Touches https://github.com/neondatabase/neon/issues/11728.

## Summary of changes

* Add `GetPageStatus::InternalError` for internal server errors.
* Rename `GetPageStatus::Invalid` to `InvalidRequest` for clarity.
* Rename `status` and `GetPageStatus` to `status_code` and
`GetPageStatusCode`.
* Add an `Into<tonic::Status>` implementation for `ProtocolError`.
2025-05-27 12:06:51 +00:00
Arpad Müller
f3976e5c60 remove safekeeper_proto_version = 3 from tests (#12020)
Some tests still explicitly specify version 3 of the safekeeper
walproposer protocol. Remove the explicit opt in from the tests as v3 is
the default now since #11518.

We don't touch the places where a test exercises both v2 and v3. Those
we leave for #12021.

Part of https://github.com/neondatabase/neon/issues/10326
2025-05-27 11:32:15 +00:00
Vlad Lazar
9657fbc194 pageserver: add and stabilize import chaos test (#11982)
## Problem

Test coverage of timeline imports is lacking.

## Summary of changes

This PR adds a chaos import test. It runs an import while injecting
various chaos events
in the environment. All the commits that follow the test fix various
issues that were surfaced by it.

Closes https://github.com/neondatabase/neon/issues/10191
2025-05-27 09:52:59 +00:00
a-masterov
dd501554c9 add a script to run the test for online-advisor as a regular user. (#12017)
## Problem
The regression test for the extension online_advisor fails on the
staging instance due to a lack of permission to alter the database.
## Summary of changes
A script was added to work around this problem.

---------

Co-authored-by: Alexander Lakhin <alexander.lakhin@neon.tech>
2025-05-27 08:54:59 +00:00
Tristan Partin
fe1513ca57 Add neon.safekeeper_conninfo_options GUC (#11901)
In order to enable TLS connections between computes and safekeepers, we
need to provide the control plane with a way to configure the various
libpq keyword parameters, sslmode and sslrootcert. neon.safekeepers is a
comma separated list of safekeepers formatted as host:port, so isn't
available for extension in the same way that neon.pageserver_connstring
is. This could be remedied in a future PR.

Part-of: https://github.com/neondatabase/cloud/issues/25823
Link:
https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-27 02:21:24 +00:00
Arpad Müller
3e86008e66 read-only timelines (#12015)
Support timeline creations on the storage controller to opt out from
their creation on the safekeepers, introducing the read-only timelines
concept. Read only timelines:

* will never receive WAL of their own, so it's fine to not create them
on the safekeepers
* the property is non-transitive. children of read-only timelines aren't
neccessarily read-only themselves.

This feature can be used for snapshots, to prevent the safekeepers from
being overloaded by empty timelines that won't ever get written to. In
the current world, this is not a problem, because timelines are created
implicitly by the compute connecting to a safekeeper that doesn't have
the timeline yet. In the future however, where the storage controller
creates timelines eagerly, we should watch out for that.

We represent read-only timelines in the storage controller database so
that we ensure that they never touch the safekeepers at all. Especially
we don't want them to cause a mess during the importing process of the
timelines from the cplane to the storcon database.

In a hypothetical future where we have a feature to detach timelines
from safekeepers, we'll either need to find a way to distinguish the
two, or if not, asking safekeepers to list the (empty) timeline prefix
and delete everything from it isn't a big issue either.

This patch will unconditionally hit the new safekeeper timeline creation
path for read-only timelines, without them needing the
`--timelines-onto-safekeepers` flag enabled. This is done because it's
lower risk (no safekeepers or computes involved at all) and gives us
some initial way to verify at least some parts of that code in prod.

https://github.com/neondatabase/cloud/issues/29435
https://github.com/neondatabase/neon/issues/11670
2025-05-26 23:23:58 +00:00
Lassi Pölönen
23fc611461 Add metadata to pgaudit log logline (#11933)
Previously we were using project-id/endpoint-id as SYSLOGTAG, which has
a
limit of 32 characters, so the endpoint-id got truncated.

The output is now in RFC5424 format, where the message is json encoded
with additional metadata `endpoint_id` and `project_id`

Also as pgaudit logs multiline messages, we now detect this by parsing
the timestamp in the specific format, and consider non-matching lines to
belong in the previous log message.

Using syslog structured-data would be an alternative, but leaning
towards json
due to being somewhat more generic.
2025-05-26 14:57:09 +00:00
Alex Chi Z.
dc953de85d feat(pageserver): integrate PostHog with gc-compaction rollout (#11917)
## Problem

part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

* Integrate feature store with tenant structure.
* gc-compaction picks up the current strategy from the feature store.
* We only log them for now for testing purpose. They will not be used
until we have more patches to support different strategies defined in
PostHog.
* We don't support property-based evaulation for now; it will be
implemented later.
* Evaluating result of the feature flag is not cached -- it's not
efficient and cannot be used on hot path right now.
* We don't report the evaluation result back to PostHog right now.

I plan to enable it in staging once we get the patch merged.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-26 13:09:37 +00:00
Alex Chi Z.
841517ee37 fix(pageserver): do not increase basebackup err counter when reconnect (#12016)
## Problem

We see unexpected basebackup error alerts in the alert channel.

https://github.com/neondatabase/neon/pull/11778 only fixed the alerts
for shutdown errors. However, another path is that tenant shutting down
while waiting LSN -> WaitLsnError::BadState -> QueryError::Reconnect.
Therefore, the reconnect error should also be discarded from the
ok/error counter.

## Summary of changes

Do not increase ok/err counter for reconnect errors.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-26 11:31:27 +00:00
a-masterov
1369d73dcd Add h3 to neon-extensions-test (#11946)
## Problem
We didn't test the h3 extension in our test suite.

## Summary of changes
Added tests for h3 and h3-postgis extensions
Includes upgrade test for h3

---------

Co-authored-by: Tristan Partin <tristan@neon.tech>
2025-05-26 11:29:39 +00:00
Erik Grinaker
7cd0defaf0 page_api: add Rust domain types (#11999)
## Problem

For the gRPC Pageserver API, we should convert the Protobuf types to
stricter, canonical Rust types.

Touches https://github.com/neondatabase/neon/issues/11728.

## Summary of changes

Adds Rust domain types that mirror the Protobuf types, with conversion
and validation.
2025-05-26 11:01:36 +00:00
Erik Grinaker
a082f9814a pageserver: add gRPC authentication (#12010)
## Problem

We need authentication for the gRPC server.

Requires #11972.
Touches #11728.

## Summary of changes

Add two request interceptors that decode the tenant/timeline/shard
metadata and authenticate the JWT token against them.
2025-05-26 10:24:45 +00:00
58 changed files with 2968 additions and 732 deletions

105
Cargo.lock generated
View File

@@ -701,7 +701,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"itoa",
"matchit",
@@ -718,7 +718,7 @@ dependencies = [
"sync_wrapper 1.0.1",
"tokio",
"tokio-tungstenite 0.26.1",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -761,7 +761,7 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
]
@@ -1337,7 +1337,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"tower 0.5.2",
"tower",
"tower-http",
"tower-otel",
"tracing",
@@ -2066,7 +2066,7 @@ dependencies = [
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tower",
"tracing",
"utils",
"workspace_hack",
@@ -2330,7 +2330,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"pin-project",
"rand 0.8.5",
@@ -2883,9 +2883,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.8.0"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "httpdate"
@@ -2935,9 +2935,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.4.1"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
dependencies = [
"bytes",
"futures-channel",
@@ -2977,7 +2977,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"rustls 0.22.4",
"rustls-pki-types",
@@ -2992,7 +2992,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -3001,20 +3001,20 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.7"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.4.1",
"hyper 1.6.0",
"libc",
"pin-project-lite",
"socket2",
"tokio",
"tower 0.4.13",
"tower-service",
"tracing",
]
@@ -4236,6 +4236,7 @@ name = "pagebench"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"camino",
"clap",
"futures",
@@ -4244,12 +4245,15 @@ dependencies = [
"humantime-serde",
"pageserver_api",
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tonic 0.13.1",
"tracing",
"utils",
"workspace_hack",
@@ -4330,6 +4334,7 @@ dependencies = [
"postgres_connection",
"postgres_ffi",
"postgres_initdb",
"posthog_client_lite",
"pprof",
"pq_proto",
"procfs",
@@ -4431,6 +4436,21 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"http 1.1.0",
"pageserver_page_api",
"thiserror 1.0.69",
"tokio",
"tonic 0.13.1",
"tracing",
"utils",
]
[[package]]
name = "pageserver_compaction"
version = "0.1.0"
@@ -4458,9 +4478,15 @@ dependencies = [
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"bytes",
"pageserver_api",
"postgres_ffi",
"prost 0.13.5",
"smallvec",
"thiserror 1.0.69",
"tonic 0.13.1",
"tonic-build",
"utils",
"workspace_hack",
]
@@ -4901,11 +4927,16 @@ name = "posthog_client_lite"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"reqwest",
"serde",
"serde_json",
"sha2",
"thiserror 1.0.69",
"tokio",
"tokio-util",
"tracing",
"tracing-utils",
"workspace_hack",
]
@@ -5196,7 +5227,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"indexmap 2.9.0",
"ipnet",
@@ -5592,7 +5623,7 @@ dependencies = [
"http-body-util",
"http-types",
"humantime-serde",
"hyper 1.4.1",
"hyper 1.6.0",
"itertools 0.10.5",
"metrics",
"once_cell",
@@ -5632,7 +5663,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-rustls 0.26.0",
"hyper-util",
"ipnet",
@@ -5689,7 +5720,7 @@ dependencies = [
"futures",
"getrandom 0.2.11",
"http 1.1.0",
"hyper 1.4.1",
"hyper 1.6.0",
"parking_lot 0.11.2",
"reqwest",
"reqwest-middleware",
@@ -6630,12 +6661,12 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.5"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -6701,7 +6732,7 @@ dependencies = [
"http-body-util",
"http-utils",
"humantime",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"metrics",
"once_cell",
@@ -7530,7 +7561,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-timeout",
"hyper-util",
"percent-encoding",
@@ -7541,7 +7572,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -7574,21 +7605,6 @@ dependencies = [
"tonic 0.13.1",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower"
version = "0.5.2"
@@ -8569,10 +8585,8 @@ dependencies = [
"fail",
"form_urlencoded",
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-task",
"futures-util",
"generic-array",
"getrandom 0.2.11",
@@ -8581,7 +8595,7 @@ dependencies = [
"hex",
"hmac",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"indexmap 2.9.0",
"itertools 0.12.1",
@@ -8602,7 +8616,6 @@ dependencies = [
"once_cell",
"p256 0.13.2",
"parquet",
"percent-encoding",
"prettyplease",
"proc-macro2",
"prost 0.13.5",
@@ -8636,7 +8649,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"toml_edit",
"tower 0.5.2",
"tower",
"tracing",
"tracing-core",
"tracing-log",

View File

@@ -8,6 +8,7 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/client_grpc",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
@@ -247,31 +248,33 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
desim = { version = "0.1", path = "./libs/desim" }
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
postgres_initdb = { path = "./libs/postgres_initdb" }
posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
safekeeper_client = { path = "./safekeeper/client" }
desim = { version = "0.1", path = "./libs/desim" }
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
storage_controller_client = { path = "./storage_controller/client" }
tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" }
tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
utils = { version = "0.1", path = "./libs/utils/" }
vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" }
walproposer = { version = "0.1", path = "./libs/walproposer/" }
wal_decoder = { version = "0.1", path = "./libs/wal_decoder" }
walproposer = { version = "0.1", path = "./libs/walproposer/" }
## Common library dependency
workspace_hack = { version = "0.1", path = "./workspace_hack/" }

View File

@@ -1847,7 +1847,7 @@ COPY docker-compose/ext-src/ /ext-src/
COPY --from=pg-build /postgres /postgres
#COPY --from=postgis-src /ext-src/ /ext-src/
COPY --from=plv8-src /ext-src/ /ext-src/
#COPY --from=h3-pg-src /ext-src/ /ext-src/
COPY --from=h3-pg-src /ext-src/h3-pg-src /ext-src/h3-pg-src
COPY --from=postgresql-unit-src /ext-src/ /ext-src/
COPY --from=pgvector-src /ext-src/ /ext-src/
COPY --from=pgjwt-src /ext-src/ /ext-src/

View File

@@ -136,6 +136,10 @@ struct Cli {
requires = "compute-id"
)]
pub control_plane_uri: Option<String>,
/// Interval in seconds for collecting installed extensions statistics
#[arg(long, default_value = "3600")]
pub installed_extensions_collection_interval: u64,
}
fn main() -> Result<()> {
@@ -179,6 +183,7 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
},
config,
)?;

View File

@@ -97,6 +97,9 @@ pub struct ComputeNodeParams {
/// the address of extension storage proxy gateway
pub remote_ext_base_url: Option<String>,
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: u64,
}
/// Compute node info shared across several `compute_ctl` threads.
@@ -695,25 +698,18 @@ impl ComputeNode {
let log_directory_path = Path::new(&self.params.pgdata).join("log");
let log_directory_path = log_directory_path.to_string_lossy().to_string();
// Add project_id,endpoint_id tag to identify the logs.
// Add project_id,endpoint_id to identify the logs.
//
// These ids are passed from cplane,
// for backwards compatibility (old computes that don't have them),
// we set them to None.
// TODO: Clean up this code when all computes have them.
let tag: Option<String> = match (
pspec.spec.project_id.as_deref(),
pspec.spec.endpoint_id.as_deref(),
) {
(Some(project_id), Some(endpoint_id)) => {
Some(format!("{project_id}/{endpoint_id}"))
}
(Some(project_id), None) => Some(format!("{project_id}/None")),
(None, Some(endpoint_id)) => Some(format!("None,{endpoint_id}")),
(None, None) => None,
};
let endpoint_id = pspec.spec.endpoint_id.as_deref().unwrap_or("");
let project_id = pspec.spec.project_id.as_deref().unwrap_or("");
configure_audit_rsyslog(log_directory_path.clone(), tag, &remote_endpoint)?;
configure_audit_rsyslog(
log_directory_path.clone(),
endpoint_id,
project_id,
&remote_endpoint,
)?;
// Launch a background task to clean up the audit logs
launch_pgaudit_gc(log_directory_path);
@@ -749,17 +745,7 @@ impl ComputeNode {
let conf = self.get_tokio_conn_conf(None);
tokio::task::spawn(async {
let res = get_installed_extensions(conf).await;
match res {
Ok(extensions) => {
info!(
"[NEON_EXT_STAT] {}",
serde_json::to_string(&extensions)
.expect("failed to serialize extensions list")
);
}
Err(err) => error!("could not get installed extensions: {err:?}"),
}
let _ = installed_extensions(conf).await;
});
}
@@ -789,6 +775,9 @@ impl ComputeNode {
// Log metrics so that we can search for slow operations in logs
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
// Spawn the extension stats background task
self.spawn_extension_stats_task();
if pspec.spec.prewarm_lfc_on_startup {
self.prewarm_lfc();
}
@@ -2199,6 +2188,41 @@ LIMIT 100",
info!("Pageserver config changed");
}
}
pub fn spawn_extension_stats_task(&self) {
let conf = self.tokio_conn_conf.clone();
let installed_extensions_collection_interval =
self.params.installed_extensions_collection_interval;
tokio::spawn(async move {
// An initial sleep is added to ensure that two collections don't happen at the same time.
// The first collection happens during compute startup.
tokio::time::sleep(tokio::time::Duration::from_secs(
installed_extensions_collection_interval,
))
.await;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
installed_extensions_collection_interval,
));
loop {
interval.tick().await;
let _ = installed_extensions(conf.clone()).await;
}
});
}
}
pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
let res = get_installed_extensions(conf).await;
match res {
Ok(extensions) => {
info!(
"[NEON_EXT_STAT] {}",
serde_json::to_string(&extensions).expect("failed to serialize extensions list")
);
}
Err(err) => error!("could not get installed extensions: {err:?}"),
}
Ok(())
}
pub fn forward_termination_signal() {

View File

@@ -2,10 +2,24 @@
module(load="imfile")
# Input configuration for log files in the specified directory
# Replace {log_directory} with the directory containing the log files
input(type="imfile" File="{log_directory}/*.log" Tag="{tag}" Severity="info" Facility="local0")
# The messages can be multiline. The start of the message is a timestamp
# in "%Y-%m-%d %H:%M:%S.%3N GMT" (so timezone hardcoded).
# Replace log_directory with the directory containing the log files
input(type="imfile" File="{log_directory}/*.log"
Tag="pgaudit_log" Severity="info" Facility="local5"
startmsg.regex="^[[:digit:]]{{4}}-[[:digit:]]{{2}}-[[:digit:]]{{2}} [[:digit:]]{{2}}:[[:digit:]]{{2}}:[[:digit:]]{{2}}.[[:digit:]]{{3}} GMT,")
# the directory to store rsyslog state files
global(workDirectory="/var/log/rsyslog")
# Forward logs to remote syslog server
*.* @@{remote_endpoint}
# Construct json, endpoint_id and project_id as additional metadata
set $.json_log!endpoint_id = "{endpoint_id}";
set $.json_log!project_id = "{project_id}";
set $.json_log!msg = $msg;
# Template suitable for rfc5424 syslog format
template(name="PgAuditLog" type="string"
string="<%PRI%>1 %TIMESTAMP:::date-rfc3339% %HOSTNAME% - - - - %$.json_log%")
# Forward to remote syslog receiver (@@<hostname>:<port>;format
local5.info @@{remote_endpoint};PgAuditLog

View File

@@ -84,13 +84,15 @@ fn restart_rsyslog() -> Result<()> {
pub fn configure_audit_rsyslog(
log_directory: String,
tag: Option<String>,
endpoint_id: &str,
project_id: &str,
remote_endpoint: &str,
) -> Result<()> {
let config_content: String = format!(
include_str!("config_template/compute_audit_rsyslog_template.conf"),
log_directory = log_directory,
tag = tag.unwrap_or("".to_string()),
endpoint_id = endpoint_id,
project_id = project_id,
remote_endpoint = remote_endpoint
);

View File

@@ -1279,6 +1279,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
ancestor_timeline_id,
ancestor_start_lsn: start_lsn,
read_only: false,
pg_version: None,
},
};

View File

@@ -20,7 +20,7 @@ first_path="$(ldconfig --verbose 2>/dev/null \
| grep --invert-match ^$'\t' \
| cut --delimiter=: --fields=1 \
| head --lines=1)"
test "$first_path" == '/usr/local/lib' || true # Remove the || true in a follow-up PR. Needed for backwards compat.
test "$first_path" == '/usr/local/lib'
echo "Waiting pageserver become ready."
while ! nc -z pageserver 6400; do

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -ex
cd "$(dirname "${0}")"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
dropdb --if-exists contrib_regression
createdb contrib_regression
cd h3_postgis/test
psql -d contrib_regression -c "CREATE EXTENSION postgis" -c "CREATE EXTENSION postgis_raster" -c "CREATE EXTENSION h3" -c "CREATE EXTENSION h3_postgis"
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
${PG_REGRESS} --use-existing --dbname contrib_regression ${TESTS}
cd ../../h3/test
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
dropdb --if-exists contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "CREATE EXTENSION h3"
${PG_REGRESS} --use-existing --dbname contrib_regression ${TESTS}

View File

@@ -0,0 +1,7 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
cd h3/test
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ${TESTS}

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname "${0}")"
if [ -f Makefile ]; then
make installcheck
fi

View File

@@ -0,0 +1,9 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
[ -f Makefile ] || exit 0
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ${TESTS}

View File

@@ -82,7 +82,8 @@ EXTENSIONS='[
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
{"extname": "pgjwt", "extdir": "pgjwt-src"},
{"extname": "pgtap", "extdir": "pgtap-src"},
{"extname": "pg_repack", "extdir": "pg_repack-src"}
{"extname": "pg_repack", "extdir": "pg_repack-src"},
{"extname": "h3", "extdir": "h3-pg-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
COMPUTE_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d

View File

@@ -45,6 +45,21 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
/// PostHog integration config.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
pub project_id: String,
/// Server-side (private) API key
pub server_api_key: String,
/// Client-side (public) API key
pub client_api_key: String,
/// Private API URL
pub private_api_url: String,
/// Public API URL
pub public_api_url: String,
}
/// `pageserver.toml`
///
/// We use serde derive with `#[serde(default)]` to generate a deserializer
@@ -186,6 +201,8 @@ pub struct ConfigToml {
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub posthog_config: Option<PostHogConfig>,
pub timeline_import_config: TimelineImportConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
@@ -701,6 +718,7 @@ impl Default for ConfigToml {
import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(),
},
basebackup_cache_config: None,
posthog_config: None,
}
}
}

View File

@@ -354,6 +354,9 @@ pub struct ShardImportProgressV1 {
pub completed: usize,
/// Hash of the plan
pub import_plan_hash: u64,
/// Soft limit for the job size
/// This needs to remain constant throughout the import
pub job_soft_size_limit: usize,
}
impl ShardImportStatus {
@@ -402,6 +405,8 @@ pub enum TimelineCreateRequestMode {
// using a flattened enum, so, it was an accepted field, and
// we continue to accept it by having it here.
pg_version: Option<u32>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
read_only: bool,
},
ImportPgdata {
import_pgdata: TimelineCreateRequestModeImportPgdata,

View File

@@ -6,9 +6,14 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
serde.workspace = true
sha2.workspace = true
workspace_hack.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-util.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
workspace_hack.workspace = true

View File

@@ -0,0 +1,59 @@
//! A background loop that fetches feature flags from PostHog and updates the feature store.
use std::{sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use tokio_util::sync::CancellationToken;
use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
/// A background loop that fetches feature flags from PostHog and updates the feature store.
pub struct FeatureResolverBackgroundLoop {
posthog_client: PostHogClient,
feature_store: ArcSwap<FeatureStore>,
cancel: CancellationToken,
}
impl FeatureResolverBackgroundLoop {
pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
Self {
posthog_client: PostHogClient::new(config),
feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
cancel: shutdown_pageserver,
}
}
pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
let this = self.clone();
let cancel = self.cancel.clone();
handle.spawn(async move {
tracing::info!("Starting PostHog feature resolver");
let mut ticker = tokio::time::interval(refresh_period);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = ticker.tick() => {}
_ = cancel.cancelled() => break
}
let resp = match this
.posthog_client
.get_feature_flags_local_evaluation()
.await
{
Ok(resp) => resp,
Err(e) => {
tracing::warn!("Cannot get feature flags: {}", e);
continue;
}
};
let feature_store = FeatureStore::new_with_flags(resp.flags);
this.feature_store.store(Arc::new(feature_store));
}
tracing::info!("PostHog feature resolver stopped");
});
}
pub fn feature_store(&self) -> Arc<FeatureStore> {
self.feature_store.load_full()
}
}

View File

@@ -1,5 +1,9 @@
//! A lite version of the PostHog client that only supports local evaluation of feature flags.
mod background_loop;
pub use background_loop::FeatureResolverBackgroundLoop;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
@@ -20,8 +24,7 @@ pub enum PostHogEvaluationError {
#[derive(Deserialize)]
pub struct LocalEvaluationResponse {
#[allow(dead_code)]
flags: Vec<LocalEvaluationFlag>,
pub flags: Vec<LocalEvaluationFlag>,
}
#[derive(Deserialize)]
@@ -34,7 +37,7 @@ pub struct LocalEvaluationFlag {
#[derive(Deserialize)]
pub struct LocalEvaluationFlagFilters {
groups: Vec<LocalEvaluationFlagFilterGroup>,
multivariate: LocalEvaluationFlagMultivariate,
multivariate: Option<LocalEvaluationFlagMultivariate>,
}
#[derive(Deserialize)]
@@ -94,6 +97,12 @@ impl FeatureStore {
}
}
pub fn new_with_flags(flags: Vec<LocalEvaluationFlag>) -> Self {
let mut store = Self::new();
store.set_flags(flags);
store
}
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
self.flags.clear();
for flag in flags {
@@ -245,7 +254,7 @@ impl FeatureStore {
}
}
/// Evaluate a multivariate feature flag. Returns `None` if the flag is not available or if there are errors
/// Evaluate a multivariate feature flag. Returns an error if the flag is not available or if there are errors
/// during the evaluation.
///
/// The parsing logic is as follows:
@@ -263,10 +272,15 @@ impl FeatureStore {
/// Example: we have a multivariate flag with 3 groups of the configured global rollout percentage: A (10%), B (20%), C (70%).
/// There is a single group with a condition that has a rollout percentage of 10% and it does not have a variant override.
/// Then, we will have 1% of the users evaluated to A, 2% to B, and 7% to C.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
/// propagated beyond where the feature flag gets resolved.
pub fn evaluate_multivariate(
&self,
flag_key: &str,
user_id: &str,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<String, PostHogEvaluationError> {
let hash_on_global_rollout_percentage =
Self::consistent_hash(user_id, flag_key, "multivariate");
@@ -276,10 +290,39 @@ impl FeatureStore {
flag_key,
hash_on_global_rollout_percentage,
hash_on_group_rollout_percentage,
&HashMap::new(),
properties,
)
}
/// Evaluate a boolean feature flag. Returns an error if the flag is not available or if there are errors
/// during the evaluation.
///
/// The parsing logic is as follows:
///
/// * Generate a consistent hash for the tenant-feature.
/// * Match each filter group.
/// - If a group is matched, it will first determine whether the user is in the range of the rollout
/// percentage.
/// - If the hash falls within the group's rollout percentage, return true.
/// * Otherwise, continue with the next group until all groups are evaluated and no group is within the
/// rollout percentage.
/// * If there are no matching groups, return an error.
///
/// Returns `Ok(())` if the feature flag evaluates to true. In the future, it will return a payload.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
/// propagated beyond where the feature flag gets resolved.
pub fn evaluate_boolean(
&self,
flag_key: &str,
user_id: &str,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<(), PostHogEvaluationError> {
let hash_on_global_rollout_percentage = Self::consistent_hash(user_id, flag_key, "boolean");
self.evaluate_boolean_inner(flag_key, hash_on_global_rollout_percentage, properties)
}
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
/// and avoid duplicate computations.
@@ -306,6 +349,11 @@ impl FeatureStore {
flag_key
)));
}
let Some(ref multivariate) = flag_config.filters.multivariate else {
return Err(PostHogEvaluationError::Internal(format!(
"No multivariate available, should use evaluate_boolean?: {flag_key}"
)));
};
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
// does not matter.
@@ -314,7 +362,7 @@ impl FeatureStore {
GroupEvaluationResult::MatchedAndOverride(variant) => return Ok(variant),
GroupEvaluationResult::MatchedAndEvaluate => {
let mut percentage = 0;
for variant in &flag_config.filters.multivariate.variants {
for variant in &multivariate.variants {
percentage += variant.rollout_percentage;
if self
.evaluate_percentage(hash_on_global_rollout_percentage, percentage)
@@ -342,6 +390,77 @@ impl FeatureStore {
)))
}
}
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
/// and avoid duplicate computations.
///
/// Use a different consistent hash for evaluating the group rollout percentage.
/// The behavior: if the condition is set to rolling out to 10% of the users, and
/// we set the variant A to 20% in the global config, then 2% of the total users will
/// be evaluated to variant A.
///
/// Note that the hash to determine group rollout percentage is shared across all groups. So if we have two
/// exactly-the-same conditions with 10% and 20% rollout percentage respectively, a total of 20% of the users
/// will be evaluated (versus 30% if group evaluation is done independently).
pub(crate) fn evaluate_boolean_inner(
&self,
flag_key: &str,
hash_on_global_rollout_percentage: f64,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<(), PostHogEvaluationError> {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
)));
}
if flag_config.filters.multivariate.is_some() {
return Err(PostHogEvaluationError::Internal(format!(
"This looks like a multivariate flag, should use evaluate_multivariate?: {flag_key}"
)));
};
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
// does not matter.
for group in &flag_config.filters.groups {
match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? {
GroupEvaluationResult::MatchedAndOverride(_) => {
return Err(PostHogEvaluationError::Internal(format!(
"Boolean flag cannot have overrides: {}",
flag_key
)));
}
GroupEvaluationResult::MatchedAndEvaluate => {
return Ok(());
}
GroupEvaluationResult::Unmatched => continue,
}
}
// If no group is matched, the feature is not available, and up to the caller to decide what to do.
Err(PostHogEvaluationError::NoConditionGroupMatched)
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
)))
}
}
}
pub struct PostHogClientConfig {
/// The server API key.
pub server_api_key: String,
/// The client API key.
pub client_api_key: String,
/// The project ID.
pub project_id: String,
/// The private API URL.
pub private_api_url: String,
/// The public API URL.
pub public_api_url: String,
}
/// A lite PostHog client.
@@ -360,37 +479,16 @@ impl FeatureStore {
/// want to report the feature flag usage back to PostHog. The current plan is to use PostHog only as an UI to
/// configure feature flags so it is very likely that the client API will not be used.
pub struct PostHogClient {
/// The server API key.
server_api_key: String,
/// The client API key.
client_api_key: String,
/// The project ID.
project_id: String,
/// The private API URL.
private_api_url: String,
/// The public API URL.
public_api_url: String,
/// The config.
config: PostHogClientConfig,
/// The HTTP client.
client: reqwest::Client,
}
impl PostHogClient {
pub fn new(
server_api_key: String,
client_api_key: String,
project_id: String,
private_api_url: String,
public_api_url: String,
) -> Self {
pub fn new(config: PostHogClientConfig) -> Self {
let client = reqwest::Client::new();
Self {
server_api_key,
client_api_key,
project_id,
private_api_url,
public_api_url,
client,
}
Self { config, client }
}
pub fn new_with_us_region(
@@ -398,13 +496,13 @@ impl PostHogClient {
client_api_key: String,
project_id: String,
) -> Self {
Self::new(
Self::new(PostHogClientConfig {
server_api_key,
client_api_key,
project_id,
"https://us.posthog.com".to_string(),
"https://us.i.posthog.com".to_string(),
)
private_api_url: "https://us.posthog.com".to_string(),
public_api_url: "https://us.i.posthog.com".to_string(),
})
}
/// Fetch the feature flag specs from the server.
@@ -422,12 +520,12 @@ impl PostHogClient {
// with bearer token of self.server_api_key
let url = format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.private_api_url, self.project_id
self.config.private_api_url, self.config.project_id
);
let response = self
.client
.get(url)
.bearer_auth(&self.server_api_key)
.bearer_auth(&self.config.server_api_key)
.send()
.await?;
let body = response.text().await?;
@@ -446,11 +544,11 @@ impl PostHogClient {
) -> anyhow::Result<()> {
// PUBLIC_URL/capture/
// with bearer token of self.client_api_key
let url = format!("{}/capture/", self.public_api_url);
let url = format!("{}/capture/", self.config.public_api_url);
self.client
.post(url)
.body(serde_json::to_string(&json!({
"api_key": self.client_api_key,
"api_key": self.config.client_api_key,
"distinct_id": distinct_id,
"event": event,
"properties": properties,
@@ -467,95 +565,162 @@ mod tests {
fn data() -> &'static str {
r#"{
"flags": [
{
"id": 132794,
"team_id": 152860,
"name": "",
"key": "gc-compaction",
"filters": {
"groups": [
{
"variant": "enabled-stage-2",
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 50
},
{
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 80
}
],
"payloads": {},
"multivariate": {
"variants": [
{
"key": "disabled",
"name": "",
"rollout_percentage": 90
},
{
"key": "enabled-stage-1",
"name": "",
"rollout_percentage": 10
},
{
"key": "enabled-stage-2",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled-stage-3",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled",
"name": "",
"rollout_percentage": 0
}
]
}
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 6
}
"flags": [
{
"id": 141807,
"team_id": 152860,
"name": "",
"key": "image-compaction-boundary",
"filters": {
"groups": [
{
"variant": null,
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
}
],
"group_type_mapping": {},
"cohorts": {}
}"#
"rollout_percentage": 40
},
{
"variant": null,
"properties": [],
"rollout_percentage": 10
}
],
"payloads": {},
"multivariate": null
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 1
},
{
"id": 135586,
"team_id": 152860,
"name": "",
"key": "boolean-flag",
"filters": {
"groups": [
{
"variant": null,
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
}
],
"rollout_percentage": 47
}
],
"payloads": {},
"multivariate": null
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 1
},
{
"id": 132794,
"team_id": 152860,
"name": "",
"key": "gc-compaction",
"filters": {
"groups": [
{
"variant": "enabled-stage-2",
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 50
},
{
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 80
}
],
"payloads": {},
"multivariate": {
"variants": [
{
"key": "disabled",
"name": "",
"rollout_percentage": 90
},
{
"key": "enabled-stage-1",
"name": "",
"rollout_percentage": 10
},
{
"key": "enabled-stage-2",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled-stage-3",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled",
"name": "",
"rollout_percentage": 0
}
]
}
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 7
}
],
"group_type_mapping": {},
"cohorts": {}
}"#
}
#[test]
@@ -631,4 +796,125 @@ mod tests {
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
}
#[test]
fn evaluate_boolean_1() {
// The `boolean-flag` feature flag only has one group that matches on the free user.
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new());
assert!(matches!(
variant,
Err(PostHogEvaluationError::NotAvailable(_))
),);
let properties_unmatched = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("paid".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// This does not match any group so there will be an error.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties_unmatched);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
let properties = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("free".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// It matches the first group as 0.10 <= 0.50 and the properties are matched. Then it gets evaluated to the variant override.
let variant = store.evaluate_boolean_inner("boolean-flag", 0.10, &properties);
assert!(variant.is_ok());
// It matches the group conditions but not the group rollout percentage.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
}
#[test]
fn evaluate_boolean_2() {
// The `image-compaction-boundary` feature flag has one group that matches on the free user and a group that matches on all users.
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant =
store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &HashMap::new());
assert!(matches!(
variant,
Err(PostHogEvaluationError::NotAvailable(_))
),);
let properties_unmatched = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("paid".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// This does not match the filtered group but the all user group.
let variant =
store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties_unmatched);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
let variant =
store.evaluate_boolean_inner("image-compaction-boundary", 0.05, &properties_unmatched);
assert!(variant.is_ok());
let properties = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("free".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// It matches the first group as 0.30 <= 0.40 and the properties are matched. Then it gets evaluated to the variant override.
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.30, &properties);
assert!(variant.is_ok());
// It matches the group conditions but not the group rollout percentage.
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
// It matches the second "all" group conditions.
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.09, &properties);
assert!(variant.is_ok());
}
}

View File

@@ -1,6 +1,7 @@
#![allow(clippy::todo)]
use std::ffi::CString;
use std::str::FromStr;
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::id::TenantTimelineId;
@@ -173,6 +174,8 @@ pub struct Config {
pub ttid: TenantTimelineId,
/// List of safekeepers in format `host:port`
pub safekeepers_list: Vec<String>,
/// libpq connection info options
pub safekeeper_conninfo_options: String,
/// Safekeeper reconnect timeout in milliseconds
pub safekeeper_reconnect_timeout: i32,
/// Safekeeper connection timeout in milliseconds
@@ -202,6 +205,9 @@ impl Wrapper {
.into_bytes_with_nul();
assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
let safekeeper_conninfo_options = CString::from_str(&config.safekeeper_conninfo_options)
.unwrap()
.into_raw();
let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
@@ -209,6 +215,7 @@ impl Wrapper {
neon_tenant,
neon_timeline,
safekeepers_list,
safekeeper_conninfo_options,
safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
safekeeper_connection_timeout: config.safekeeper_connection_timeout,
wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
@@ -576,6 +583,7 @@ mod tests {
let config = crate::walproposer::Config {
ttid,
safekeepers_list: vec!["localhost:5000".to_string()],
safekeeper_conninfo_options: String::new(),
safekeeper_reconnect_timeout: 1000,
safekeeper_connection_timeout: 10000,
sync_safekeepers: true,

View File

@@ -17,51 +17,69 @@ anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
bit_field.workspace = true
bincode.workspace = true
bit_field.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true
camino-tempfile.workspace = true
camino.workspace = true
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["string"] }
consumption_metrics.workspace = true
crc32c.workspace = true
either.workspace = true
enum-map.workspace = true
enumset = { workspace = true, features = ["serde"]}
fail.workspace = true
futures.workspace = true
hashlink.workspace = true
hex.workspace = true
humantime.workspace = true
http-utils.workspace = true
humantime-serde.workspace = true
humantime.workspace = true
hyper0.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
md5.workspace = true
metrics.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses
num_cpus.workspace = true
num_cpus.workspace = true # hack to get the number of worker threads tokio uses
num-traits.workspace = true
once_cell.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
pageserver_compaction.workspace = true
pageserver_page_api.workspace = true
pem.workspace = true
pin-project-lite.workspace = true
postgres_backend.workspace = true
postgres_connection.workspace = true
postgres_ffi.workspace = true
postgres_initdb.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
postgres_initdb.workspace = true
posthog_client_lite.workspace = true
pprof.workspace = true
pq_proto.workspace = true
rand.workspace = true
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
regex.workspace = true
remote_storage.workspace = true
reqwest.workspace = true
rpds.workspace = true
rustls.workspace = true
scopeguard.workspace = true
send-future.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
serde_path_to_error.workspace = true
serde_with.workspace = true
serde.workspace = true
smallvec.workspace = true
storage_broker.workspace = true
strum_macros.workspace = true
strum.workspace = true
sysinfo.workspace = true
tokio-tar.workspace = true
tenant_size_model.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
@@ -70,6 +88,7 @@ tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-stream.workspace = true
tokio-tar.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tonic.workspace = true
@@ -77,29 +96,10 @@ tonic-reflection.workspace = true
tracing.workspace = true
tracing-utils.workspace = true
url.workspace = true
walkdir.workspace = true
metrics.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
pageserver_compaction.workspace = true
pem.workspace = true
postgres_connection.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true
storage_broker.workspace = true
tenant_size_model.workspace = true
http-utils.workspace = true
utils.workspace = true
workspace_hack.workspace = true
reqwest.workspace = true
rpds.workspace = true
enum-map.workspace = true
enumset = { workspace = true, features = ["serde"]}
strum.workspace = true
strum_macros.workspace = true
wal_decoder.workspace = true
smallvec.workspace = true
walkdir.workspace = true
workspace_hack.workspace = true
twox-hash.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]

View File

@@ -0,0 +1,16 @@
[package]
name = "pageserver_client_grpc"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
bytes.workspace = true
futures.workspace = true
http.workspace = true
thiserror.workspace = true
tonic.workspace = true
tracing.workspace = true
pageserver_page_api.workspace = true
utils.workspace = true
tokio.workspace = true

View File

@@ -0,0 +1,192 @@
//!
//! Pageserver gRPC client library
//!
//! This library provides a gRPC client for the pageserver for the
//! communicator project.
//!
//! This library is a work in progress.
//!
//!
use std::collections::HashMap;
use bytes::Bytes;
use futures::{StreamExt};
use thiserror::Error;
use tonic::metadata::AsciiMetadataValue;
use pageserver_page_api::proto;
use pageserver_page_api::proto::PageServiceClient;
use utils::shard::ShardIndex;
use std::fmt::Debug;
use tracing::error;
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
#[derive(Error, Debug)]
pub enum PageserverClientError {
#[error("could not connect to service: {0}")]
ConnectError(#[from] tonic::transport::Error),
#[error("could not perform request: {0}`")]
RequestError(#[from] tonic::Status),
#[error("protocol error: {0}")]
ProtocolError(#[from] pageserver_page_api::ProtocolError),
#[error("could not perform request: {0}`")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("could not perform request: {0}`")]
Other(String),
}
pub struct PageserverClient {
endpoint_map: HashMap<ShardIndex, Endpoint>,
channels: tokio::sync::RwLock<HashMap<ShardIndex, Channel>>,
auth_interceptor: AuthInterceptor,
}
impl PageserverClient {
/// TODO: this doesn't currently react to changes in the shard map.
pub fn new(
tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>,
shard_map: HashMap<ShardIndex, String>,
) -> Result<Self, PageserverClientError> {
let endpoint_map: HashMap<ShardIndex, Endpoint> = shard_map
.into_iter()
.map(|(shard, url)| {
let endpoint = Endpoint::from_shared(url)
.map_err(|_e| PageserverClientError::Other("Unable to parse endpoint {url}".to_string()))?;
Ok::<(ShardIndex, Endpoint), PageserverClientError>((shard, endpoint))
})
.collect::<Result<_, _>>()?;
Ok(Self {
endpoint_map,
channels: RwLock::new(HashMap::new()),
auth_interceptor: AuthInterceptor::new(
tenant_id,
timeline_id,
auth_token,
),
})
}
//
// TODO: This opens a new gRPC stream for every request, which is extremely inefficient
pub async fn get_page(
&self,
shard: ShardIndex,
request: pageserver_page_api::GetPageRequest,
) -> Result<Vec<Bytes>, PageserverClientError> {
// FIXME: calculate the shard number correctly
let chan = self.get_client(shard).await?;
let mut client =
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
let request = proto::GetPageRequest::try_from(request)?;
let request_stream = futures::stream::once(std::future::ready(request));
let mut response_stream = client
.get_pages(tonic::Request::new(request_stream))
.await?
.into_inner();
let Some(response) = response_stream.next().await else {
return Err(PageserverClientError::Other(
"no response received for getpage request".to_string(),
));
};
match response {
Err(status) => {
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
let response: pageserver_page_api::GetPageResponse = resp.try_into().unwrap();
return Ok(response.page_images.to_vec());
}
}
}
//
// TODO: this should use a connection pool with concurrency limits,
// not a single connection to the shard.
//
async fn get_client(&self, shard: ShardIndex) -> Result<Channel, PageserverClientError> {
// Get channel from the hashmap
let mut channels = self.channels.write();
if let Some(channel) = channels.await.get(&shard) {
return Ok(channel.clone());
}
// Create a new channel if it doesn't exist
let shard_endpoint = self
.endpoint_map
.get(&shard);
let endpoint = match shard_endpoint{
Some(_endpoint) => _endpoint,
None => {
error!("Shard {shard} not found in shard map");
return Err(PageserverClientError::Other(format!(
"Shard {shard} not found in shard map"
)));
}
};
let channel = endpoint.connect().await?;
channels = self.channels.write();
channels.await.insert(shard, channel.clone());
Ok(channel.clone())
}
}
/// Inject tenant_id, timeline_id and authentication token to all pageserver requests.
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
shard_id: Option<AsciiMetadataValue>,
timeline_id: AsciiMetadataValue,
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
}
impl AuthInterceptor {
fn new(tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>) -> Self {
Self {
tenant_id: tenant_id,
shard_id: None,
timeline_id: timeline_id,
auth_header: auth_token
.map(|t| format!("Bearer {t}"))
.map(|t| t.parse().expect("could not parse auth token")),
}
}
fn for_shard(&self, shard_id: ShardIndex) -> Self {
let mut with_shard = self.clone();
with_shard.shard_id = Some(
shard_id
.to_string()
.parse()
.expect("could not parse shard id"),
);
with_shard
}
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
if let Some(shard_id) = &self.shard_id {
req.metadata_mut().insert("neon-shard-id", shard_id.clone());
}
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
}
Ok(req)
}
}

View File

@@ -5,8 +5,14 @@ edition.workspace = true
license.workspace = true
[dependencies]
bytes.workspace = true
pageserver_api.workspace = true
postgres_ffi.workspace = true
prost.workspace = true
smallvec.workspace = true
thiserror.workspace = true
tonic.workspace = true
utils.workspace = true
workspace_hack.workspace = true
[build-dependencies]

View File

@@ -54,9 +54,9 @@ service PageService {
// RPCs use regular unary requests, since they are not as frequent and
// performance-critical, and this simplifies implementation.
//
// NB: a status response (e.g. errors) will terminate the stream. The stream
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
// Most errors are therefore sent as GetPageResponse.status instead.
// NB: a gRPC status response (e.g. errors) will terminate the stream. The
// stream may be shared by multiple Postgres backends, so we avoid this by
// sending them as GetPageResponse.status_code instead.
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
// Returns the size of a relation, as # of blocks.
@@ -159,8 +159,8 @@ message GetPageRequest {
// A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future.
enum GetPageClass {
// Unknown class. For forwards compatibility: used when the client sends a
// class that the server doesn't know about.
// Unknown class. For backwards compatibility: used when an older client version sends a class
// that a newer server version has removed.
GET_PAGE_CLASS_UNKNOWN = 0;
// A normal request. This is the default.
GET_PAGE_CLASS_NORMAL = 1;
@@ -180,31 +180,37 @@ message GetPageResponse {
// The original request's ID.
uint64 request_id = 1;
// The response status code.
GetPageStatus status = 2;
GetPageStatusCode status_code = 2;
// A string describing the status, if any.
string reason = 3;
// The 8KB page images, in the same order as the request. Empty if status != OK.
// The 8KB page images, in the same order as the request. Empty if status_code != OK.
repeated bytes page_image = 4;
}
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
// want to send errors as gRPC statuses, since this would terminate the stream.
enum GetPageStatus {
// Unknown status. For forwards compatibility: used when the server sends a
// status code that the client doesn't know about.
GET_PAGE_STATUS_UNKNOWN = 0;
// A GetPageResponse status code.
//
// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
// (potentially shared by many backends), and a gRPC status response would terminate the stream so
// we send GetPageResponse messages with these codes instead.
enum GetPageStatusCode {
// Unknown status. For forwards compatibility: used when an older client version receives a new
// status code from a newer server version.
GET_PAGE_STATUS_CODE_UNKNOWN = 0;
// The request was successful.
GET_PAGE_STATUS_OK = 1;
GET_PAGE_STATUS_CODE_OK = 1;
// The page did not exist. The tenant/timeline/shard has already been
// validated during stream setup.
GET_PAGE_STATUS_NOT_FOUND = 2;
GET_PAGE_STATUS_CODE_NOT_FOUND = 2;
// The request was invalid.
GET_PAGE_STATUS_INVALID = 3;
GET_PAGE_STATUS_CODE_INVALID_REQUEST = 3;
// The request failed due to an internal server error.
GET_PAGE_STATUS_CODE_INTERNAL_ERROR = 4;
// The tenant is rate limited. Slow down and retry later.
GET_PAGE_STATUS_SLOW_DOWN = 4;
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
// layer download. This could free up the server task to process other
// requests while the layer download is in progress.
GET_PAGE_STATUS_CODE_SLOW_DOWN = 5;
// NB: shutdown errors are emitted as a gRPC Unavailable status.
//
// TODO: consider adding a GET_PAGE_STATUS_CODE_LAYER_DOWNLOAD in the case of a layer download.
// This could free up the server task to process other requests while the download is in progress.
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on

View File

@@ -17,3 +17,7 @@ pub mod proto {
pub use page_service_client::PageServiceClient;
pub use page_service_server::{PageService, PageServiceServer};
}
mod model;
pub use model::*;

View File

@@ -0,0 +1,595 @@
//! Structs representing the canonical page service API.
//!
//! These mirror the autogenerated Protobuf types. The differences are:
//!
//! - Types that are in fact required by the API are not Options. The protobuf "required"
//! attribute is deprecated and 'prost' marks a lot of members as optional because of that.
//! (See <https://github.com/tokio-rs/prost/issues/800> for a gripe on this)
//!
//! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits.
//!
//! - Validate protocol invariants, via try_from() and try_into().
use bytes::Bytes;
use postgres_ffi::Oid;
use smallvec::SmallVec;
// TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid
// pulling in all of their other crate dependencies when building the client.
use utils::lsn::Lsn;
use crate::proto;
/// A protocol error. Typically returned via try_from() or try_into().
#[derive(thiserror::Error, Debug)]
pub enum ProtocolError {
#[error("field '{0}' has invalid value '{1}'")]
Invalid(&'static str, String),
#[error("required field '{0}' is missing")]
Missing(&'static str),
}
impl ProtocolError {
/// Helper to generate a new ProtocolError::Invalid for the given field and value.
pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self {
Self::Invalid(field, format!("{value:?}"))
}
}
impl From<ProtocolError> for tonic::Status {
fn from(err: ProtocolError) -> Self {
tonic::Status::invalid_argument(format!("{err}"))
}
}
/// The LSN a request should read at.
#[derive(Clone, Copy, Debug)]
pub struct ReadLsn {
/// The request's read LSN.
pub request_lsn: Lsn,
/// If given, the caller guarantees that the page has not been modified since this LSN. Must be
/// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
/// without waiting for the request LSN to arrive. Valid for all request types.
///
/// It is undefined behaviour to make a request such that the page was, in fact, modified
/// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
/// error, or it might return the old page version or the new page version. Setting
/// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary
/// waiting.
pub not_modified_since_lsn: Option<Lsn>,
}
impl ReadLsn {
/// Validates the ReadLsn.
pub fn validate(&self) -> Result<(), ProtocolError> {
if self.request_lsn == Lsn::INVALID {
return Err(ProtocolError::invalid("request_lsn", self.request_lsn));
}
if self.not_modified_since_lsn > Some(self.request_lsn) {
return Err(ProtocolError::invalid(
"not_modified_since_lsn",
self.not_modified_since_lsn,
));
}
Ok(())
}
}
impl TryFrom<proto::ReadLsn> for ReadLsn {
type Error = ProtocolError;
fn try_from(pb: proto::ReadLsn) -> Result<Self, Self::Error> {
let read_lsn = Self {
request_lsn: Lsn(pb.request_lsn),
not_modified_since_lsn: match pb.not_modified_since_lsn {
0 => None,
lsn => Some(Lsn(lsn)),
},
};
read_lsn.validate()?;
Ok(read_lsn)
}
}
impl TryFrom<ReadLsn> for proto::ReadLsn {
type Error = ProtocolError;
fn try_from(read_lsn: ReadLsn) -> Result<Self, Self::Error> {
read_lsn.validate()?;
Ok(Self {
request_lsn: read_lsn.request_lsn.0,
not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0,
})
}
}
// RelTag is defined in pageserver_api::reltag.
pub type RelTag = pageserver_api::reltag::RelTag;
impl TryFrom<proto::RelTag> for RelTag {
type Error = ProtocolError;
fn try_from(pb: proto::RelTag) -> Result<Self, Self::Error> {
Ok(Self {
spcnode: pb.spc_oid,
dbnode: pb.db_oid,
relnode: pb.rel_number,
forknum: pb
.fork_number
.try_into()
.map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?,
})
}
}
impl From<RelTag> for proto::RelTag {
fn from(rel_tag: RelTag) -> Self {
Self {
spc_oid: rel_tag.spcnode,
db_oid: rel_tag.dbnode,
rel_number: rel_tag.relnode,
fork_number: rel_tag.forknum as u32,
}
}
}
/// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
#[derive(Clone, Copy, Debug)]
pub struct CheckRelExistsRequest {
pub read_lsn: ReadLsn,
pub rel: RelTag,
}
impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
type Error = ProtocolError;
fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
})
}
}
pub type CheckRelExistsResponse = bool;
impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
fn from(pb: proto::CheckRelExistsResponse) -> Self {
pb.exists
}
}
impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
fn from(exists: CheckRelExistsResponse) -> Self {
Self { exists }
}
}
/// Requests a base backup at a given LSN.
#[derive(Clone, Copy, Debug)]
pub struct GetBaseBackupRequest {
/// The LSN to fetch a base backup at.
pub read_lsn: ReadLsn,
/// If true, logical replication slots will not be created.
pub replica: bool,
}
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
replica: pb.replica,
})
}
}
impl TryFrom<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
type Error = ProtocolError;
fn try_from(request: GetBaseBackupRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: Some(request.read_lsn.try_into()?),
replica: request.replica,
})
}
}
pub type GetBaseBackupResponseChunk = Bytes;
impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
type Error = ProtocolError;
fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
if pb.chunk.is_empty() {
return Err(ProtocolError::Missing("chunk"));
}
Ok(pb.chunk)
}
}
impl TryFrom<GetBaseBackupResponseChunk> for proto::GetBaseBackupResponseChunk {
type Error = ProtocolError;
fn try_from(chunk: GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
if chunk.is_empty() {
return Err(ProtocolError::Missing("chunk"));
}
Ok(Self { chunk })
}
}
/// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetDbSizeRequest {
pub read_lsn: ReadLsn,
pub db_oid: Oid,
}
impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetDbSizeRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
db_oid: pb.db_oid,
})
}
}
impl TryFrom<GetDbSizeRequest> for proto::GetDbSizeRequest {
type Error = ProtocolError;
fn try_from(request: GetDbSizeRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: Some(request.read_lsn.try_into()?),
db_oid: request.db_oid,
})
}
}
pub type GetDbSizeResponse = u64;
impl From<proto::GetDbSizeResponse> for GetDbSizeResponse {
fn from(pb: proto::GetDbSizeResponse) -> Self {
pb.num_bytes
}
}
impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
fn from(num_bytes: GetDbSizeResponse) -> Self {
Self { num_bytes }
}
}
/// Requests one or more pages.
#[derive(Clone, Debug)]
pub struct GetPageRequest {
/// A request ID. Will be included in the response. Should be unique for in-flight requests on
/// the stream.
pub request_id: RequestID,
/// The request class.
pub request_class: GetPageClass,
/// The LSN to read at.
pub read_lsn: ReadLsn,
/// The relation to read from.
pub rel: RelTag,
/// Page numbers to read. Must belong to the remote shard.
///
/// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access
/// costs and parallelizing them. This may increase the latency of any individual request, but
/// improves the overall latency and throughput of the batch as a whole.
pub block_numbers: SmallVec<[u32; 1]>,
}
impl TryFrom<proto::GetPageRequest> for GetPageRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetPageRequest) -> Result<Self, Self::Error> {
if pb.block_number.is_empty() {
return Err(ProtocolError::Missing("block_number"));
}
Ok(Self {
request_id: pb.request_id,
request_class: pb.request_class.into(),
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
block_numbers: pb.block_number.into(),
})
}
}
impl TryFrom<GetPageRequest> for proto::GetPageRequest {
type Error = ProtocolError;
fn try_from(request: GetPageRequest) -> Result<Self, Self::Error> {
if request.block_numbers.is_empty() {
return Err(ProtocolError::Missing("block_number"));
}
Ok(Self {
request_id: request.request_id,
request_class: request.request_class.into(),
read_lsn: Some(request.read_lsn.try_into()?),
rel: Some(request.rel.into()),
block_number: request.block_numbers.into_vec(),
})
}
}
/// A GetPage request ID.
pub type RequestID = u64;
/// A GetPage request class.
#[derive(Clone, Copy, Debug)]
pub enum GetPageClass {
/// Unknown class. For backwards compatibility: used when an older client version sends a class
/// that a newer server version has removed.
Unknown,
/// A normal request. This is the default.
Normal,
/// A prefetch request. NB: can only be classified on pg < 18.
Prefetch,
/// A background request (e.g. vacuum).
Background,
}
impl From<proto::GetPageClass> for GetPageClass {
fn from(pb: proto::GetPageClass) -> Self {
match pb {
proto::GetPageClass::Unknown => Self::Unknown,
proto::GetPageClass::Normal => Self::Normal,
proto::GetPageClass::Prefetch => Self::Prefetch,
proto::GetPageClass::Background => Self::Background,
}
}
}
impl From<i32> for GetPageClass {
fn from(class: i32) -> Self {
proto::GetPageClass::try_from(class)
.unwrap_or(proto::GetPageClass::Unknown)
.into()
}
}
impl From<GetPageClass> for proto::GetPageClass {
fn from(class: GetPageClass) -> Self {
match class {
GetPageClass::Unknown => Self::Unknown,
GetPageClass::Normal => Self::Normal,
GetPageClass::Prefetch => Self::Prefetch,
GetPageClass::Background => Self::Background,
}
}
}
impl From<GetPageClass> for i32 {
fn from(class: GetPageClass) -> Self {
proto::GetPageClass::from(class).into()
}
}
/// A GetPage response.
///
/// A batch response will contain all of the requested pages. We could eagerly emit individual pages
/// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the
/// batch and we'll only return once the entire batch is ready, so no one can make use of the
/// individual pages.
#[derive(Clone, Debug)]
pub struct GetPageResponse {
/// The original request's ID.
pub request_id: RequestID,
/// The response status code.
pub status_code: GetPageStatusCode,
/// A string describing the status, if any.
pub reason: Option<String>,
/// The 8KB page images, in the same order as the request. Empty if status != OK.
pub page_images: SmallVec<[Bytes; 1]>,
}
impl From<proto::GetPageResponse> for GetPageResponse {
fn from(pb: proto::GetPageResponse) -> Self {
Self {
request_id: pb.request_id,
status_code: pb.status_code.into(),
reason: Some(pb.reason).filter(|r| !r.is_empty()),
page_images: pb.page_image.into(),
}
}
}
impl From<GetPageResponse> for proto::GetPageResponse {
fn from(response: GetPageResponse) -> Self {
Self {
request_id: response.request_id,
status_code: response.status_code.into(),
reason: response.reason.unwrap_or_default(),
page_image: response.page_images.into_vec(),
}
}
}
/// A GetPage response status code.
///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
/// (potentially shared by many backends), and a gRPC status response would terminate the stream so
/// we send GetPageResponse messages with these codes instead.
#[derive(Clone, Copy, Debug)]
pub enum GetPageStatusCode {
/// Unknown status. For forwards compatibility: used when an older client version receives a new
/// status code from a newer server version.
Unknown,
/// The request was successful.
Ok,
/// The page did not exist. The tenant/timeline/shard has already been validated during stream
/// setup.
NotFound,
/// The request was invalid.
InvalidRequest,
/// The request failed due to an internal server error.
InternalError,
/// The tenant is rate limited. Slow down and retry later.
SlowDown,
}
impl From<proto::GetPageStatusCode> for GetPageStatusCode {
fn from(pb: proto::GetPageStatusCode) -> Self {
match pb {
proto::GetPageStatusCode::Unknown => Self::Unknown,
proto::GetPageStatusCode::Ok => Self::Ok,
proto::GetPageStatusCode::NotFound => Self::NotFound,
proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
proto::GetPageStatusCode::InternalError => Self::InternalError,
proto::GetPageStatusCode::SlowDown => Self::SlowDown,
}
}
}
impl From<i32> for GetPageStatusCode {
fn from(status_code: i32) -> Self {
proto::GetPageStatusCode::try_from(status_code)
.unwrap_or(proto::GetPageStatusCode::Unknown)
.into()
}
}
impl From<GetPageStatusCode> for proto::GetPageStatusCode {
fn from(status_code: GetPageStatusCode) -> Self {
match status_code {
GetPageStatusCode::Unknown => Self::Unknown,
GetPageStatusCode::Ok => Self::Ok,
GetPageStatusCode::NotFound => Self::NotFound,
GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
GetPageStatusCode::InternalError => Self::InternalError,
GetPageStatusCode::SlowDown => Self::SlowDown,
}
}
}
impl From<GetPageStatusCode> for i32 {
fn from(status_code: GetPageStatusCode) -> Self {
proto::GetPageStatusCode::from(status_code).into()
}
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
// shards will error.
pub struct GetRelSizeRequest {
pub read_lsn: ReadLsn,
pub rel: RelTag,
}
impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
type Error = ProtocolError;
fn try_from(proto: proto::GetRelSizeRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: proto
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
})
}
}
impl TryFrom<GetRelSizeRequest> for proto::GetRelSizeRequest {
type Error = ProtocolError;
fn try_from(request: GetRelSizeRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: Some(request.read_lsn.try_into()?),
rel: Some(request.rel.into()),
})
}
}
pub type GetRelSizeResponse = u32;
impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
fn from(proto: proto::GetRelSizeResponse) -> Self {
proto.num_blocks
}
}
impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
fn from(num_blocks: GetRelSizeResponse) -> Self {
Self { num_blocks }
}
}
/// Requests an SLRU segment. Only valid on shard 0, other shards will error.
pub struct GetSlruSegmentRequest {
pub read_lsn: ReadLsn,
pub kind: SlruKind,
pub segno: u32,
}
impl TryFrom<proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
kind: u8::try_from(pb.kind)
.ok()
.and_then(SlruKind::from_repr)
.ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?,
segno: pb.segno,
})
}
}
impl TryFrom<GetSlruSegmentRequest> for proto::GetSlruSegmentRequest {
type Error = ProtocolError;
fn try_from(request: GetSlruSegmentRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: Some(request.read_lsn.try_into()?),
kind: request.kind as u32,
segno: request.segno,
})
}
}
pub type GetSlruSegmentResponse = Bytes;
impl TryFrom<proto::GetSlruSegmentResponse> for GetSlruSegmentResponse {
type Error = ProtocolError;
fn try_from(pb: proto::GetSlruSegmentResponse) -> Result<Self, Self::Error> {
if pb.segment.is_empty() {
return Err(ProtocolError::Missing("segment"));
}
Ok(pb.segment)
}
}
impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
type Error = ProtocolError;
fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
if segment.is_empty() {
return Err(ProtocolError::Missing("segment"));
}
Ok(Self { segment })
}
}
// SlruKind is defined in pageserver_api::reltag.
pub type SlruKind = pageserver_api::reltag::SlruKind;

View File

@@ -20,9 +20,13 @@ serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tokio.workspace = true
tonic.workspace = true
tokio-util.workspace = true
async-trait = "0.1"
pageserver_client.workspace = true
pageserver_api.workspace = true
pageserver_client_grpc.workspace = true
pageserver_page_api.workspace = true
utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -6,25 +6,40 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tonic::metadata::AsciiMetadataValue;
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::shard::TenantShardId;
use pageserver_client::page_service::PagestreamClient;
use rand::prelude::*;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::id::TenantTimelineId;
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use futures::{
future::BoxFuture,
stream::FuturesOrdered,
FutureExt, StreamExt,
};
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
use async_trait::async_trait;
use rand::distributions::weighted::WeightedIndex;
use utils::shard::ShardIndex;
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "false")]
grpc: bool,
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
@@ -303,7 +318,19 @@ async fn main_impl(
.unwrap();
Box::pin(async move {
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
if args.grpc {
let grpc = GrpcProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
client_proto(args, grpc, worker_id, ss, cancel, rps_period, ranges, weights).await
} else {
let pg = PgProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
client_proto(args, pg, worker_id, ss, cancel, rps_period, ranges, weights).await
}
})
};
@@ -354,9 +381,208 @@ async fn main_impl(
anyhow::Ok(())
}
/// Common interface for both Pg and Grpc versions.
#[async_trait]
trait Protocol: Send {
/// Constructor/factory.
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self
where
Self: Sized;
async fn client_libpq(
/// Fire off a “get page” request and store the start time.
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
);
/// Wait for the next response and return its start time.
async fn get_start_time(&mut self) -> Instant;
/// How many in-flight requests do we have?
fn len(&self) -> usize;
}
///////////////////////////////////////////////////////////////////////////////
// PgProtocol
///////////////////////////////////////////////////////////////////////////////
struct PgProtocol {
libpq_pagestream: PagestreamClient,
libpq_vector: VecDeque<Instant>,
}
#[async_trait]
impl Protocol for PgProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let client = pageserver_client::page_service::Client::new(conn_string)
.await
.unwrap()
.pagestream(tenant_id, timeline_id)
.await
.unwrap();
Self {
libpq_pagestream: client,
libpq_vector: VecDeque::new(),
}
}
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
) {
// build your PagestreamGetPageRequest exactly as before…
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key.to_rel_block().unwrap();
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
let _ = self.libpq_pagestream.getpage_send(req).await;
self.libpq_vector.push_back(start);
}
async fn get_start_time(&mut self) -> Instant {
let start = self.libpq_vector.pop_front().unwrap();
let _ = self.libpq_pagestream.getpage_recv().await;
start
}
fn len(&self) -> usize {
self.libpq_vector.len()
}
}
///////////////////////////////////////////////////////////////////////////////
// GrpcProtocol
///////////////////////////////////////////////////////////////////////////////
type GetPageFut = BoxFuture<'static, (Instant, Option<pageserver_client_grpc::PageserverClientError>)>;
struct GrpcProtocol {
grpc_page_client: Arc<pageserver_client_grpc::PageserverClient>,
grpc_vector: FuturesOrdered<GetPageFut>,
}
#[async_trait]
impl Protocol for GrpcProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let shard_map = std::collections::HashMap::from([(
ShardIndex::unsharded(),
conn_string.clone(),
)]);
let tenant_ascii : AsciiMetadataValue = tenant_id.to_string().parse().unwrap();
let timeline_ascii : AsciiMetadataValue = timeline_id.to_string().parse().unwrap();
let client = pageserver_client_grpc::PageserverClient::new(
tenant_ascii,
timeline_ascii,
None,
shard_map,
).unwrap();
Self {
grpc_page_client: Arc::new(client),
grpc_vector: FuturesOrdered::new(),
}
}
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
) {
// build your GetPageRequest exactly as before…
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key.to_rel_block().unwrap();
pageserver_page_api::GetPageRequest {
request_id: 0,
request_class: pageserver_page_api::GetPageClass::Normal,
read_lsn: pageserver_page_api::ReadLsn {
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since_lsn: Some(r.timeline_lsn),
},
rel: pageserver_page_api::RelTag {
spcnode: rel_tag.spcnode,
dbnode: rel_tag.dbnode,
relnode: rel_tag.relnode,
forknum: rel_tag.forknum,
},
block_numbers: vec![block_no].into(),
}
};
let client_clone = self.grpc_page_client.clone();
let getpage_fut : GetPageFut = async move {
let result = client_clone.get_page(ShardIndex::unsharded(), req).await;
match result {
Ok(_) => {
(start, None)
}
Err(e) => {
(start, Some(e))
}
}
}.boxed();
self.grpc_vector.push_back(getpage_fut);
}
async fn get_start_time(&mut self) -> Instant {
let (start, err) = self.grpc_vector.next().await.unwrap();
if let Some(e) = err {
tracing::error!("getpage request failed: {e}");
}
start
}
fn len(&self) -> usize {
self.grpc_vector.len()
}
}
async fn client_proto(
args: &Args,
mut protocol: impl Protocol,
worker_id: WorkerId,
shared_state: Arc<SharedState>,
cancel: CancellationToken,
@@ -364,18 +590,11 @@ async fn client_libpq(
ranges: Vec<KeyRange>,
weights: rand::distributions::weighted::WeightedIndex<i128>,
) {
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
.await
.unwrap();
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
@@ -390,37 +609,12 @@ async fn client_libpq(
ticks_processed = periods_passed_until_now;
}
while inflight.len() < args.queue_depth.get() {
while protocol.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
client.getpage_send(req).await.unwrap();
inflight.push_back(start);
protocol.add_to_inflight(start, args, ranges.clone(), weights.clone()).await;
}
let start = inflight.pop_front().unwrap();
client.getpage_recv().await.unwrap();
let start = protocol.get_start_time().await;
let end = Instant::now();
shared_state.live_stats.request_done();
ticks_processed += 1;
@@ -436,9 +630,11 @@ async fn client_libpq(
if let Some(period) = &rps_period {
let next_at = client_start
+ Duration::from_micros(
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
tokio::time::sleep_until(next_at.into()).await;
}
}
}

View File

@@ -21,6 +21,7 @@ use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
use pageserver::deletion_queue::DeletionQueue;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::feature_resolver::FeatureResolver;
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
@@ -522,6 +523,12 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = BACKGROUND_RUNTIME.block_on(create_remote_storage_client(conf))?;
let feature_resolver = create_feature_resolver(
conf,
shutdown_pageserver.clone(),
BACKGROUND_RUNTIME.handle(),
)?;
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
@@ -575,6 +582,7 @@ fn start_pageserver(
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
feature_resolver,
},
order,
shutdown_pageserver.clone(),
@@ -849,6 +857,14 @@ fn start_pageserver(
})
}
fn create_feature_resolver(
conf: &'static PageServerConf,
shutdown_pageserver: CancellationToken,
handle: &tokio::runtime::Handle,
) -> anyhow::Result<FeatureResolver> {
FeatureResolver::spawn(conf, shutdown_pageserver, handle)
}
async fn create_remote_storage_client(
conf: &'static PageServerConf,
) -> anyhow::Result<GenericRemoteStorage> {

View File

@@ -14,7 +14,7 @@ use std::time::Duration;
use anyhow::{Context, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig};
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pem::Pem;
@@ -238,6 +238,9 @@ pub struct PageServerConf {
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
/// PostHog integration config.
pub posthog_config: Option<PostHogConfig>,
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
@@ -421,6 +424,7 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
posthog_config,
timeline_import_config,
basebackup_cache_config,
} = config_toml;
@@ -536,6 +540,7 @@ impl PageServerConf {
}
None => Vec::new(),
},
posthog_config,
};
// ------------------------------------------------------------

View File

@@ -0,0 +1,94 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use posthog_client_lite::{
FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
};
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
use crate::config::PageServerConf;
#[derive(Clone)]
pub struct FeatureResolver {
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
}
impl FeatureResolver {
pub fn new_disabled() -> Self {
Self { inner: None }
}
pub fn spawn(
conf: &PageServerConf,
shutdown_pageserver: CancellationToken,
handle: &tokio::runtime::Handle,
) -> anyhow::Result<Self> {
// DO NOT block in this function: make it return as fast as possible to avoid startup delays.
if let Some(posthog_config) = &conf.posthog_config {
let inner = FeatureResolverBackgroundLoop::new(
PostHogClientConfig {
server_api_key: posthog_config.server_api_key.clone(),
client_api_key: posthog_config.client_api_key.clone(),
project_id: posthog_config.project_id.clone(),
private_api_url: posthog_config.private_api_url.clone(),
public_api_url: posthog_config.public_api_url.clone(),
},
shutdown_pageserver,
);
let inner = Arc::new(inner);
// TODO: make this configurable
inner.clone().spawn(handle, Duration::from_secs(60));
Ok(FeatureResolver { inner: Some(inner) })
} else {
Ok(FeatureResolver { inner: None })
}
}
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
/// propagated beyond where the feature flag gets resolved.
pub fn evaluate_multivariate(
&self,
flag_key: &str,
tenant_id: TenantId,
) -> Result<String, PostHogEvaluationError> {
if let Some(inner) = &self.inner {
inner.feature_store().evaluate_multivariate(
flag_key,
&tenant_id.to_string(),
&HashMap::new(),
)
} else {
Err(PostHogEvaluationError::NotAvailable(
"PostHog integration is not enabled".to_string(),
))
}
}
/// Evaluate a boolean feature flag. Currently, we do not support any properties.
///
/// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
/// propagated beyond where the feature flag gets resolved.
pub fn evaluate_boolean(
&self,
flag_key: &str,
tenant_id: TenantId,
) -> Result<(), PostHogEvaluationError> {
if let Some(inner) = &self.inner {
inner.feature_store().evaluate_boolean(
flag_key,
&tenant_id.to_string(),
&HashMap::new(),
)
} else {
Err(PostHogEvaluationError::NotAvailable(
"PostHog integration is not enabled".to_string(),
))
}
}
}

View File

@@ -353,6 +353,33 @@ paths:
"200":
description: OK
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/mark_invisible:
parameters:
- name: tenant_shard_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
put:
requestBody:
content:
application/json:
schema:
type: object
properties:
is_visible:
type: boolean
default: false
responses:
"200":
description: OK
/v1/tenant/{tenant_shard_id}/location_config:
parameters:
- name: tenant_shard_id
@@ -626,6 +653,8 @@ paths:
format: hex
pg_version:
type: integer
read_only:
type: boolean
existing_initdb_timeline_id:
type: string
format: hex

View File

@@ -370,6 +370,18 @@ impl From<crate::tenant::secondary::SecondaryTenantError> for ApiError {
}
}
impl From<crate::tenant::FinalizeTimelineImportError> for ApiError {
fn from(err: crate::tenant::FinalizeTimelineImportError) -> ApiError {
use crate::tenant::FinalizeTimelineImportError::*;
match err {
ImportTaskStillRunning => {
ApiError::ResourceUnavailable("Import task still running".into())
}
ShuttingDown => ApiError::ShuttingDown,
}
}
}
// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
timeline: &Arc<Timeline>,
@@ -572,6 +584,7 @@ async fn timeline_create_handler(
TimelineCreateRequestMode::Branch {
ancestor_timeline_id,
ancestor_start_lsn,
read_only: _,
pg_version: _,
} => tenant::CreateTimelineParams::Branch(tenant::CreateTimelineParamsBranch {
new_timeline_id,
@@ -3532,10 +3545,7 @@ async fn activate_post_import_handler(
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
tenant
.finalize_importing_timeline(timeline_id)
.await
.map_err(ApiError::InternalServerError)?;
tenant.finalize_importing_timeline(timeline_id).await?;
match tenant.get_timeline(timeline_id, false) {
Ok(_timeline) => {

View File

@@ -10,6 +10,7 @@ pub mod context;
pub mod controller_upcall_client;
pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod feature_resolver;
pub mod http;
pub mod import_datadir;
pub mod l0_flush;

View File

@@ -2234,8 +2234,10 @@ impl BasebackupQueryTimeOngoingRecording<'_> {
// If you want to change categorize of a specific error, also change it in `log_query_error`.
let metric = match res {
Ok(_) => &self.parent.ok,
Err(QueryError::Shutdown) => {
// Do not observe ok/err for shutdown
Err(QueryError::Shutdown) | Err(QueryError::Reconnect) => {
// Do not observe ok/err for shutdown/reconnect.
// Reconnect error might be raised when the operation is waiting for LSN and the tenant shutdown interrupts
// the operation. A reconnect error will be issued and the client will retry.
return;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error)))

View File

@@ -43,12 +43,14 @@ use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::service::Interceptor as _;
use tracing::*;
use utils::auth::{Claims, Scope, SwappableJwtAuth};
use utils::failpoint_support;
use utils::id::{TenantId, TimelineId};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::logging::log_slow;
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
use utils::simple_rcu::RcuReadGuard;
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
@@ -200,9 +202,9 @@ pub fn spawn_grpc(
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service.
let page_service = proto::PageServiceServer::new(PageServerHandler::new(
let page_service_handler = PageServerHandler::new(
tenant_manager,
auth,
auth.clone(),
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
conf.get_vectored_concurrent_io,
ConnectionPerfSpanFields::default(),
@@ -210,7 +212,18 @@ pub fn spawn_grpc(
ctx,
cancel.clone(),
gate.enter().expect("just created"),
));
);
let mut tenant_interceptor = TenantMetadataInterceptor;
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
let interceptors = move |mut req: tonic::Request<()>| {
req = tenant_interceptor.call(req)?;
req = auth_interceptor.call(req)?;
Ok(req)
};
let page_service =
proto::PageServiceServer::with_interceptor(page_service_handler, interceptors);
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
@@ -756,6 +769,9 @@ struct BatchedGetPageRequest {
timer: SmgrOpTimer,
lsn_range: LsnRange,
ctx: RequestContext,
// If the request is perf enabled, this contains a context
// with a perf span tracking the time spent waiting for the executor.
batch_wait_ctx: Option<RequestContext>,
}
#[cfg(feature = "testing")]
@@ -768,6 +784,7 @@ struct BatchedTestRequest {
/// so that we don't keep the [`Timeline::gate`] open while the batch
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
#[derive(IntoStaticStr)]
#[allow(clippy::large_enum_variant)]
enum BatchedFeMessage {
Exists {
span: Span,
@@ -1285,6 +1302,22 @@ impl PageServerHandler {
}
};
let batch_wait_ctx = if ctx.has_perf_span() {
Some(
RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"WAIT_EXECUTOR",
)
})
.attached_child(),
)
} else {
None
};
BatchedFeMessage::GetPage {
span,
shard: shard.downgrade(),
@@ -1296,6 +1329,7 @@ impl PageServerHandler {
request_lsn: req.hdr.request_lsn
},
ctx,
batch_wait_ctx,
}],
// The executor grabs the batch when it becomes idle.
// Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
@@ -1451,7 +1485,7 @@ impl PageServerHandler {
let mut flush_timers = Vec::with_capacity(handler_results.len());
for handler_result in &mut handler_results {
let flush_timer = match handler_result {
Ok((_, timer)) => Some(
Ok((_response, timer, _ctx)) => Some(
timer
.observe_execution_end(flushing_start_time)
.expect("we are the first caller"),
@@ -1471,7 +1505,7 @@ impl PageServerHandler {
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
let response_msg = match handler_result {
let (response_msg, ctx) = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1496,15 +1530,30 @@ impl PageServerHandler {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
req: e.req,
message: e.err.to_string(),
})
(
PagestreamBeMessage::Error(PagestreamErrorResponse {
req: e.req,
message: e.err.to_string(),
}),
None,
)
}
},
Ok((response_msg, _op_timer_already_observed)) => response_msg,
Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)),
};
let ctx = ctx.map(|req_ctx| {
RequestContextBuilder::from(&req_ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"FLUSH_RESPONSE",
)
})
.attached_child()
});
//
// marshal & transmit response message
//
@@ -1527,6 +1576,17 @@ impl PageServerHandler {
)),
None => futures::future::Either::Right(flush_fut),
};
let flush_fut = if let Some(req_ctx) = ctx.as_ref() {
futures::future::Either::Left(
flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| {
current_perf_span.clone()
}),
)
} else {
futures::future::Either::Right(flush_fut)
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
@@ -1556,7 +1616,7 @@ impl PageServerHandler {
ctx: &RequestContext,
) -> Result<
(
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>,
Span,
),
QueryError,
@@ -1583,7 +1643,7 @@ impl PageServerHandler {
self.handle_get_rel_exists_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1602,7 +1662,7 @@ impl PageServerHandler {
self.handle_get_nblocks_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1649,7 +1709,7 @@ impl PageServerHandler {
self.handle_db_size_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1668,7 +1728,7 @@ impl PageServerHandler {
self.handle_get_slru_segment_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -2020,12 +2080,25 @@ impl PageServerHandler {
return Ok(());
}
};
let batch = match batch {
let mut batch = match batch {
Ok(batch) => batch,
Err(e) => {
return Err(e);
}
};
if let BatchedFeMessage::GetPage {
pages,
span: _,
shard: _,
batch_break_reason: _,
} = &mut batch
{
for req in pages {
req.batch_wait_ctx.take();
}
}
self.pagestream_handle_batched_message(
pgb_writer,
batch,
@@ -2338,7 +2411,8 @@ impl PageServerHandler {
io_concurrency: IoConcurrency,
batch_break_reason: GetPageBatchBreakReason,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
{
debug_assert_current_span_has_tenant_and_timeline_id();
timeline
@@ -2445,6 +2519,7 @@ impl PageServerHandler {
page,
}),
req.timer,
req.ctx,
)
})
.map_err(|e| BatchedPageStreamError {
@@ -2489,7 +2564,8 @@ impl PageServerHandler {
timeline: &Timeline,
requests: Vec<BatchedTestRequest>,
_ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
{
// real requests would do something with the timeline
let mut results = Vec::with_capacity(requests.len());
for _req in requests.iter() {
@@ -2516,6 +2592,10 @@ impl PageServerHandler {
req: req.req.clone(),
}),
req.timer,
RequestContext::new(
TaskKind::PageRequestHandler,
DownloadBehavior::Warn,
),
)
})
.map_err(|e| BatchedPageStreamError {
@@ -3290,6 +3370,104 @@ impl From<GetActiveTenantError> for QueryError {
}
}
/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
/// TenantTimelineId and ShardIndex.
///
/// TODO: consider looking up the timeline handle here and storing it.
#[derive(Clone)]
struct TenantMetadataInterceptor;
impl tonic::service::Interceptor for TenantMetadataInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
// Decode the tenant ID.
let tenant_id = req
.metadata()
.get("neon-tenant-id")
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))?
.to_str()
.map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
let tenant_id = TenantId::from_str(tenant_id)
.map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
// Decode the timeline ID.
let timeline_id = req
.metadata()
.get("neon-timeline-id")
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))?
.to_str()
.map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
let timeline_id = TimelineId::from_str(timeline_id)
.map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
// Decode the shard ID.
let shard_index = req
.metadata()
.get("neon-shard-id")
.ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
.to_str()
.map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
let shard_index = ShardIndex::from_str(shard_index)
.map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
// Stash them in the request.
let extensions = req.extensions_mut();
extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
extensions.insert(shard_index);
Ok(req)
}
}
/// Authenticates gRPC page service requests. Must run after TenantMetadataInterceptor.
#[derive(Clone)]
struct TenantAuthInterceptor {
auth: Option<Arc<SwappableJwtAuth>>,
}
impl TenantAuthInterceptor {
fn new(auth: Option<Arc<SwappableJwtAuth>>) -> Self {
Self { auth }
}
}
impl tonic::service::Interceptor for TenantAuthInterceptor {
fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
// Do nothing if auth is disabled.
let Some(auth) = self.auth.as_ref() else {
return Ok(req);
};
// Fetch the tenant ID that's been set by TenantMetadataInterceptor.
let ttid = req
.extensions()
.get::<TenantTimelineId>()
.expect("TenantMetadataInterceptor must run before TenantAuthInterceptor");
// Fetch and decode the JWT token.
let jwt = req
.metadata()
.get("authorization")
.ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))?
.to_str()
.map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))?
.strip_prefix("Bearer ")
.ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))?
.trim();
let jwtdata: TokenData<Claims> = auth
.decode(jwt)
.map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?;
let claims = jwtdata.claims;
// Check if the token is valid for this tenant.
check_permission(&claims, Some(ttid.tenant_id))
.map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
// TODO: consider stashing the claims in the request extensions, if needed.
Ok(req)
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum GetActiveTimelineError {
#[error(transparent)]

View File

@@ -84,6 +84,7 @@ use crate::context;
use crate::context::RequestContextBuilder;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
use crate::feature_resolver::FeatureResolver;
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::{
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
@@ -159,6 +160,7 @@ pub struct TenantSharedResources {
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
pub basebackup_prepare_sender: BasebackupPrepareSender,
pub feature_resolver: FeatureResolver,
}
/// A [`TenantShard`] is really an _attached_ tenant. The configuration
@@ -380,6 +382,8 @@ pub struct TenantShard {
pub(crate) gc_block: gc_block::GcBlock,
l0_flush_global_state: L0FlushGlobalState,
feature_resolver: FeatureResolver,
}
impl std::fmt::Debug for TenantShard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -860,6 +864,14 @@ impl Debug for SetStoppingError {
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum FinalizeTimelineImportError {
#[error("Import task not done yet")]
ImportTaskStillRunning,
#[error("Shutting down")]
ShuttingDown,
}
/// Arguments to [`TenantShard::create_timeline`].
///
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
@@ -1146,10 +1158,20 @@ impl TenantShard {
ctx,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
disk_consistent_lsn.is_valid(),
"Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
);
if !disk_consistent_lsn.is_valid() {
// As opposed to normal timelines which get initialised with a disk consitent LSN
// via initdb, imported timelines start from 0. If the import task stops before
// it advances disk consitent LSN, allow it to resume.
let in_progress_import = import_pgdata
.as_ref()
.map(|import| !import.is_done())
.unwrap_or(false);
if !in_progress_import {
anyhow::bail!("Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn");
}
}
assert_eq!(
disk_consistent_lsn,
metadata.disk_consistent_lsn(),
@@ -1243,20 +1265,25 @@ impl TenantShard {
}
}
// Sanity check: a timeline should have some content.
anyhow::ensure!(
ancestor.is_some()
|| timeline
.layers
.read()
.await
.layer_map()
.expect("currently loading, layer manager cannot be shutdown already")
.iter_historic_layers()
.next()
.is_some(),
"Timeline has no ancestor and no layer files"
);
if disk_consistent_lsn.is_valid() {
// Sanity check: a timeline should have some content.
// Exception: importing timelines might not yet have any
anyhow::ensure!(
ancestor.is_some()
|| timeline
.layers
.read()
.await
.layer_map()
.expect(
"currently loading, layer manager cannot be shutdown already"
)
.iter_historic_layers()
.next()
.is_some(),
"Timeline has no ancestor and no layer files"
);
}
Ok(TimelineInitAndSyncResult::ReadyToActivate)
}
@@ -1292,6 +1319,7 @@ impl TenantShard {
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
feature_resolver,
} = resources;
let attach_mode = attached_conf.location.attach_mode;
@@ -1308,6 +1336,7 @@ impl TenantShard {
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
feature_resolver,
));
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
@@ -2854,13 +2883,13 @@ impl TenantShard {
pub(crate) async fn finalize_importing_timeline(
&self,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
) -> Result<(), FinalizeTimelineImportError> {
let timeline = {
let locked = self.timelines_importing.lock().unwrap();
match locked.get(&timeline_id) {
Some(importing_timeline) => {
if !importing_timeline.import_task_handle.is_finished() {
return Err(anyhow::anyhow!("Import task not done yet"));
return Err(FinalizeTimelineImportError::ImportTaskStillRunning);
}
importing_timeline.timeline.clone()
@@ -2873,8 +2902,13 @@ impl TenantShard {
timeline
.remote_client
.schedule_index_upload_for_import_pgdata_finalize()?;
timeline.remote_client.wait_completion().await?;
.schedule_index_upload_for_import_pgdata_finalize()
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
timeline
.remote_client
.wait_completion()
.await
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
self.timelines_importing
.lock()
@@ -3135,11 +3169,18 @@ impl TenantShard {
.or_insert_with(|| Arc::new(GcCompactionQueue::new()))
.clone()
};
let gc_compaction_strategy = self
.feature_resolver
.evaluate_multivariate("gc-comapction-strategy", self.tenant_shard_id.tenant_id)
.ok();
let span = if let Some(gc_compaction_strategy) = gc_compaction_strategy {
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id, strategy = %gc_compaction_strategy)
} else {
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id)
};
outcome = queue
.iteration(cancel, ctx, &self.gc_block, &timeline)
.instrument(
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id),
)
.instrument(span)
.await?;
}
@@ -3471,8 +3512,9 @@ impl TenantShard {
let mut timelines_importing = self.timelines_importing.lock().unwrap();
timelines_importing
.drain()
.for_each(|(_timeline_id, importing_timeline)| {
importing_timeline.shutdown();
.for_each(|(timeline_id, importing_timeline)| {
let span = tracing::info_span!("importing_timeline_shutdown", %timeline_id);
js.spawn(async move { importing_timeline.shutdown().instrument(span).await });
});
}
// test_long_timeline_create_then_tenant_delete is leaning on this message
@@ -4247,6 +4289,7 @@ impl TenantShard {
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
basebackup_prepare_sender: BasebackupPrepareSender,
feature_resolver: FeatureResolver,
) -> TenantShard {
assert!(!attached_conf.location.generation.is_none());
@@ -4351,6 +4394,7 @@ impl TenantShard {
gc_block: Default::default(),
l0_flush_global_state,
basebackup_prepare_sender,
feature_resolver,
}
}
@@ -5271,6 +5315,7 @@ impl TenantShard {
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
feature_resolver: self.feature_resolver.clone(),
}
}
@@ -5873,6 +5918,7 @@ pub(crate) mod harness {
// TODO: ideally we should run all unit tests with both configs
L0FlushGlobalState::new(L0FlushConfig::default()),
basebackup_requst_sender,
FeatureResolver::new_disabled(),
));
let preload = tenant
@@ -8314,10 +8360,24 @@ mod tests {
}
tline.freeze_and_flush().await?;
// Force layers to L1
tline
.compact(
&cancel,
{
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceL0Compaction);
flags
},
&ctx,
)
.await?;
if iter % 5 == 0 {
let scan_lsn = Lsn(lsn.0 + 1);
info!("scanning at {}", scan_lsn);
let (_, before_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone())
.await?;
tline
.compact(
@@ -8326,13 +8386,14 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::ForceL0Compaction);
flags
},
&ctx,
)
.await?;
let (_, after_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone())
.await?;
assert!(
after_delta_file_accessed < before_delta_file_accessed,
@@ -8773,6 +8834,8 @@ mod tests {
let cancel = CancellationToken::new();
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
tline.force_set_disk_consistent_lsn(Lsn(0x40));
tline
.compact(
&cancel,
@@ -8786,8 +8849,7 @@ mod tests {
)
.await
.unwrap();
// Image layers are created at last_record_lsn
// Image layers are created at repartition LSN
let images = tline
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
.await

View File

@@ -103,6 +103,7 @@ use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
use crate::feature_resolver::FeatureResolver;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::metrics::{
@@ -198,6 +199,7 @@ pub struct TimelineResources {
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
pub basebackup_prepare_sender: BasebackupPrepareSender,
pub feature_resolver: FeatureResolver,
}
pub struct Timeline {
@@ -444,6 +446,8 @@ pub struct Timeline {
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_prepare_sender: BasebackupPrepareSender,
feature_resolver: FeatureResolver,
}
pub(crate) enum PreviousHeatmap {
@@ -3072,6 +3076,8 @@ impl Timeline {
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
basebackup_prepare_sender: resources.basebackup_prepare_sender,
feature_resolver: resources.feature_resolver,
};
result.repartition_threshold =
@@ -4906,6 +4912,7 @@ impl Timeline {
LastImageLayerCreationStatus::Initial,
false, // don't yield for L0, we're flushing L0
)
.instrument(info_span!("create_image_layers", mode = %ImageLayerCreationMode::Initial, partition_mode = "initial", lsn = %self.initdb_lsn))
.await?;
debug_assert!(
matches!(is_complete, LastImageLayerCreationStatus::Complete),
@@ -5462,7 +5469,8 @@ impl Timeline {
/// Returns the image layers generated and an enum indicating whether the process is fully completed.
/// true = we have generate all image layers, false = we preempt the process for L0 compaction.
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
///
/// `partition_mode` is only for logging purpose and is not used anywhere in this function.
async fn create_image_layers(
self: &Arc<Timeline>,
partitioning: &KeyPartitioning,

View File

@@ -1278,11 +1278,55 @@ impl Timeline {
}
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
let l0_l1_boundary_lsn = {
// We do the repartition on the L0-L1 boundary. All data below the boundary
// are compacted by L0 with low read amplification, thus making the `repartition`
// function run fast.
let guard = self.layers.read().await;
guard
.all_persistent_layers()
.iter()
.map(|x| {
// Use the end LSN of delta layers OR the start LSN of image layers.
if x.is_delta {
x.lsn_range.end
} else {
x.lsn_range.start
}
})
.max()
};
let (partition_mode, partition_lsn) = if cfg!(test)
|| cfg!(feature = "testing")
|| self
.feature_resolver
.evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id)
.is_ok()
{
let last_repartition_lsn = self.partitioning.read().1;
let lsn = match l0_l1_boundary_lsn {
Some(boundary) => gc_cutoff
.max(boundary)
.max(last_repartition_lsn)
.max(self.initdb_lsn)
.max(self.ancestor_lsn),
None => self.get_last_record_lsn(),
};
if lsn <= self.initdb_lsn || lsn <= self.ancestor_lsn {
// Do not attempt to create image layers below the initdb or ancestor LSN -- no data below it
("l0_l1_boundary", self.get_last_record_lsn())
} else {
("l0_l1_boundary", lsn)
}
} else {
("latest_record", self.get_last_record_lsn())
};
// 2. Repartition and create image layers if necessary
match self
.repartition(
self.get_last_record_lsn(),
partition_lsn,
self.get_compaction_target_size(),
options.flags,
ctx,
@@ -1301,18 +1345,19 @@ impl Timeline {
.extend(sparse_partitioning.into_dense().parts);
// 3. Create new image layers for partitions that have been modified "enough".
let mode = if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
};
let (image_layers, outcome) = self
.create_image_layers(
&partitioning,
lsn,
if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
mode,
&image_ctx,
self.last_image_layer_creation_status
.load()
@@ -1320,6 +1365,7 @@ impl Timeline {
.clone(),
options.flags.contains(CompactFlags::YieldForL0),
)
.instrument(info_span!("create_image_layers", mode = %mode, partition_mode = %partition_mode, lsn = %lsn))
.await
.inspect_err(|err| {
if let CreateImageLayersError::GetVectoredError(
@@ -1344,7 +1390,8 @@ impl Timeline {
}
Ok(_) => {
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
// This happens very frequently so we don't want to log it.
debug!("skipping repartitioning due to image compaction LSN being below GC cutoff");
}
// Suppress errors when cancelled.

View File

@@ -25,8 +25,11 @@ pub(crate) struct ImportingTimeline {
}
impl ImportingTimeline {
pub(crate) fn shutdown(self) {
pub(crate) async fn shutdown(self) {
self.import_task_handle.abort();
let _ = self.import_task_handle.await;
self.timeline.remote_client.shutdown().await;
}
}
@@ -93,6 +96,11 @@ pub async fn doit(
);
}
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.

View File

@@ -30,6 +30,7 @@
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::Arc;
@@ -100,8 +101,24 @@ async fn run_v1(
tasks: Vec::default(),
};
let import_config = &timeline.conf.timeline_import_config;
let plan = planner.plan(import_config).await?;
// Use the job size limit encoded in the progress if we are resuming an import.
// This ensures that imports have stable plans even if the pageserver config changes.
let import_config = {
match &import_progress {
Some(progress) => {
let base = &timeline.conf.timeline_import_config;
TimelineImportConfig {
import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit)
.unwrap(),
import_job_concurrency: base.import_job_concurrency,
import_job_checkpoint_threshold: base.import_job_checkpoint_threshold,
}
}
None => timeline.conf.timeline_import_config.clone(),
}
};
let plan = planner.plan(&import_config).await?;
// Hash the plan and compare with the hash of the plan we got back from the storage controller.
// If the two match, it means that the planning stage had the same output.
@@ -113,20 +130,20 @@ async fn run_v1(
let plan_hash = hasher.finish();
if let Some(progress) = &import_progress {
if plan_hash != progress.import_plan_hash {
anyhow::bail!("Import plan does not match storcon metadata");
}
// Handle collisions on jobs of unequal length
if progress.jobs != plan.jobs.len() {
anyhow::bail!("Import plan job length does not match storcon metadata")
}
if plan_hash != progress.import_plan_hash {
anyhow::bail!("Import plan does not match storcon metadata");
}
}
pausable_failpoint!("import-timeline-pre-execute-pausable");
let start_from_job_idx = import_progress.map(|progress| progress.completed);
plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx)
plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx)
.await
}
@@ -218,6 +235,19 @@ impl Planner {
checkpoint_buf,
)));
// Sort the tasks by the key ranges they handle.
// The plan being generated here needs to be stable across invocations
// of this method.
self.tasks.sort_by_key(|task| match task {
AnyImportTask::SingleKey(key) => (key.key, key.key.next()),
AnyImportTask::RelBlocks(rel_blocks) => {
(rel_blocks.key_range.start, rel_blocks.key_range.end)
}
AnyImportTask::SlruBlocks(slru_blocks) => {
(slru_blocks.key_range.start, slru_blocks.key_range.end)
}
});
// Assigns parts of key space to later parallel jobs
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
@@ -426,6 +456,8 @@ impl Plan {
}));
},
maybe_complete_job_idx = work.next() => {
pausable_failpoint!("import-task-complete-pausable");
match maybe_complete_job_idx {
Some(Ok((job_idx, res))) => {
assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
@@ -438,8 +470,12 @@ impl Plan {
jobs: jobs_in_plan,
completed: last_completed_job_idx,
import_plan_hash,
job_soft_size_limit: import_config.import_job_soft_size_limit.into(),
};
timeline.remote_client.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
storcon_client.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
@@ -640,7 +676,11 @@ impl Hash for ImportSingleKeyTask {
let ImportSingleKeyTask { key, buf } = self;
key.hash(state);
buf.hash(state);
// The key value might not have a stable binary representation.
// For instance, the db directory uses an unstable hash-map.
// To work around this we are a bit lax here and only hash the
// size of the buffer which must be consistent.
buf.len().hash(state);
}
}
@@ -915,7 +955,7 @@ impl ChunkProcessingJob {
let guard = timeline.layers.read().await;
let existing_layer = guard.try_get_from_key(&desc.key());
if let Some(layer) = existing_layer {
if layer.metadata().generation != timeline.generation {
if layer.metadata().generation == timeline.generation {
return Err(anyhow::anyhow!(
"Import attempted to rewrite layer file in the same generation: {}",
layer.local_path()

View File

@@ -155,8 +155,9 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
int written = 0;
written = snprintf((char *) &sk->conninfo, MAXCONNINFO,
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant);
"%s host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
wp->config->safekeeper_conninfo_options, sk->host, sk->port,
wp->config->neon_timeline, wp->config->neon_tenant);
if (written > MAXCONNINFO || written < 0)
wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
}

View File

@@ -714,6 +714,9 @@ typedef struct WalProposerConfig
*/
char *safekeepers_list;
/* libpq connection info options. */
char *safekeeper_conninfo_options;
/*
* WalProposer reconnects to offline safekeepers once in this interval.
* Time is in milliseconds.

View File

@@ -64,6 +64,7 @@ char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
int safekeeper_proto_version = 3;
char *safekeeper_conninfo_options = "";
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
@@ -119,6 +120,7 @@ init_walprop_config(bool syncSafekeepers)
walprop_config.neon_timeline = neon_timeline;
/* WalProposerCreate scribbles directly on it, so pstrdup */
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
walprop_config.safekeeper_conninfo_options = pstrdup(safekeeper_conninfo_options);
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
walprop_config.wal_segment_size = wal_segment_size;
@@ -203,6 +205,16 @@ nwp_register_gucs(void)
* GUC_LIST_QUOTE */
NULL, assign_neon_safekeepers, NULL);
DefineCustomStringVariable(
"neon.safekeeper_conninfo_options",
"libpq keyword parameters and values to apply to safekeeper connections",
NULL,
&safekeeper_conninfo_options,
"",
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomIntVariable(
"neon.safekeeper_reconnect_timeout",
"Walproposer reconnects to offline safekeepers once in this interval.",

View File

@@ -70,34 +70,6 @@ pub(crate) enum JsonConversionError {
ParseJsonError(#[from] serde_json::Error),
#[error("unbalanced array")]
UnbalancedArray,
#[error("unbalanced quoted string")]
UnbalancedString,
}
enum OutputMode {
Array(Vec<Value>),
Object(Map<String, Value>),
}
impl OutputMode {
fn key(&mut self, key: &str) -> &mut Value {
match self {
OutputMode::Array(values) => push_entry(values, Value::Null),
OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null),
}
}
fn finish(self) -> Value {
match self {
OutputMode::Array(values) => Value::Array(values),
OutputMode::Object(map) => Value::Object(map),
}
}
}
fn push_entry<T>(arr: &mut Vec<T>, t: T) -> &mut T {
arr.push(t);
arr.last_mut().expect("a value was just inserted")
}
//
@@ -105,276 +77,182 @@ fn push_entry<T>(arr: &mut Vec<T>, t: T) -> &mut T {
//
pub(crate) fn pg_text_row_to_json(
row: &Row,
columns: &[Type],
raw_output: bool,
array_mode: bool,
) -> Result<Value, JsonConversionError> {
let mut entries = if array_mode {
OutputMode::Array(Vec::with_capacity(row.columns().len()))
let iter = row
.columns()
.iter()
.zip(columns)
.enumerate()
.map(|(i, (column, typ))| {
let name = column.name();
let pg_value = row.as_text(i).map_err(JsonConversionError::AsTextError)?;
let json_value = if raw_output {
match pg_value {
Some(v) => Value::String(v.to_string()),
None => Value::Null,
}
} else {
pg_text_to_json(pg_value, typ)?
};
Ok((name.to_string(), json_value))
});
if array_mode {
// drop keys and aggregate into array
let arr = iter
.map(|r| r.map(|(_key, val)| val))
.collect::<Result<Vec<Value>, JsonConversionError>>()?;
Ok(Value::Array(arr))
} else {
OutputMode::Object(Map::with_capacity(row.columns().len()))
};
for (i, column) in row.columns().iter().enumerate() {
let pg_value = row.as_text(i).map_err(JsonConversionError::AsTextError)?;
let value = entries.key(column.name());
match pg_value {
Some(v) if raw_output => *value = Value::String(v.to_string()),
Some(v) => pg_text_to_json(value, v, column.type_())?,
None => *value = Value::Null,
}
let obj = iter.collect::<Result<Map<String, Value>, JsonConversionError>>()?;
Ok(Value::Object(obj))
}
Ok(entries.finish())
}
//
// Convert postgres text-encoded value to JSON value
//
fn pg_text_to_json(
output: &mut Value,
val: &str,
pg_type: &Type,
) -> Result<(), JsonConversionError> {
if let Kind::Array(elem_type) = pg_type.kind() {
// todo: we should fetch this from postgres.
let delimiter = ',';
let mut array = vec![];
pg_array_parse(&mut array, val, elem_type, delimiter)?;
*output = Value::Array(array);
return Ok(());
}
match *pg_type {
Type::BOOL => *output = Value::Bool(val == "t"),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
*output = Value::Number(serde_json::Number::from(val));
fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, JsonConversionError> {
if let Some(val) = pg_value {
if let Kind::Array(elem_type) = pg_type.kind() {
return pg_array_parse(val, elem_type);
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
*output = Value::Number(num);
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
*output = Value::String(val.to_string());
match *pg_type {
Type::BOOL => Ok(Value::Bool(val == "t")),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
Ok(Value::Number(serde_json::Number::from(val)))
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
Ok(Value::Number(num))
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
Ok(Value::String(val.to_string()))
}
}
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
_ => Ok(Value::String(val.to_string())),
}
} else {
Ok(Value::Null)
}
}
//
// Parse postgres array into JSON array.
//
// This is a bit involved because we need to handle nested arrays and quoted
// values. Unlike postgres we don't check that all nested arrays have the same
// dimensions, we just return them as is.
//
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, JsonConversionError> {
pg_array_parse_inner(pg_array, elem_type, false).map(|(v, _)| v)
}
fn pg_array_parse_inner(
pg_array: &str,
elem_type: &Type,
nested: bool,
) -> Result<(Value, usize), JsonConversionError> {
let mut pg_array_chr = pg_array.char_indices();
let mut level = 0;
let mut quote = false;
let mut entries: Vec<Value> = Vec::new();
let mut entry = String::new();
// skip bounds decoration
if let Some('[') = pg_array.chars().next() {
for (_, c) in pg_array_chr.by_ref() {
if c == '=' {
break;
}
}
Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?,
_ => *output = Value::String(val.to_string()),
}
Ok(())
}
fn push_checked(
entry: &mut String,
entries: &mut Vec<Value>,
elem_type: &Type,
) -> Result<(), JsonConversionError> {
if !entry.is_empty() {
// While in usual postgres response we get nulls as None and everything else
// as Some(&str), in arrays we get NULL as unquoted 'NULL' string (while
// string with value 'NULL' will be represented by '"NULL"'). So catch NULLs
// here while we have quotation info and convert them to None.
if entry == "NULL" {
entries.push(pg_text_to_json(None, elem_type)?);
} else {
entries.push(pg_text_to_json(Some(entry), elem_type)?);
}
entry.clear();
}
/// Parse postgres array into JSON array.
///
/// This is a bit involved because we need to handle nested arrays and quoted
/// values. Unlike postgres we don't check that all nested arrays have the same
/// dimensions, we just return them as is.
///
/// <https://www.postgresql.org/docs/current/arrays.html#ARRAYS-IO>
///
/// The external text representation of an array value consists of items that are interpreted
/// according to the I/O conversion rules for the array's element type, plus decoration that
/// indicates the array structure. The decoration consists of curly braces (`{` and `}`) around
/// the array value plus delimiter characters between adjacent items. The delimiter character
/// is usually a comma (,) but can be something else: it is determined by the typdelim setting
/// for the array's element type. Among the standard data types provided in the PostgreSQL
/// distribution, all use a comma, except for type box, which uses a semicolon (;).
///
/// In a multidimensional array, each dimension (row, plane, cube, etc.)
/// gets its own level of curly braces, and delimiters must be written between adjacent
/// curly-braced entities of the same level.
fn pg_array_parse(
elements: &mut Vec<Value>,
mut pg_array: &str,
elem: &Type,
delim: char,
) -> Result<(), JsonConversionError> {
// skip bounds decoration, eg:
// `[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}`
// technically these are significant, but we have no way to represent them in json.
if let Some('[') = pg_array.chars().next() {
let Some((_bounds, array)) = pg_array.split_once('=') else {
return Err(JsonConversionError::UnbalancedArray);
};
pg_array = array;
Ok(())
}
// whitespace might preceed a `{`.
let pg_array = pg_array.trim_start();
while let Some((mut i, mut c)) = pg_array_chr.next() {
let mut escaped = false;
let rest = pg_array_parse_inner(elements, pg_array, elem, delim)?;
if !rest.is_empty() {
return Err(JsonConversionError::UnbalancedArray);
}
if c == '\\' {
escaped = true;
let Some(x) = pg_array_chr.next() else {
return Err(JsonConversionError::UnbalancedArray);
};
(i, c) = x;
}
Ok(())
}
/// reads a single array from the `pg_array` string and pushes each values to `elements`.
/// returns the rest of the `pg_array` string that was not read.
fn pg_array_parse_inner<'a>(
elements: &mut Vec<Value>,
mut pg_array: &'a str,
elem: &Type,
delim: char,
) -> Result<&'a str, JsonConversionError> {
// array should have a `{` prefix.
pg_array = pg_array
.strip_prefix('{')
.ok_or(JsonConversionError::UnbalancedArray)?;
let mut q = String::new();
loop {
let value = push_entry(elements, Value::Null);
pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?;
// check for separator.
if let Some(next) = pg_array.strip_prefix(delim) {
// next item.
pg_array = next;
} else {
break;
match c {
'{' if !quote => {
level += 1;
if level > 1 {
let (res, off) = pg_array_parse_inner(&pg_array[i..], elem_type, true)?;
entries.push(res);
for _ in 0..off - 1 {
pg_array_chr.next();
}
}
}
'}' if !quote => {
level -= 1;
if level == 0 {
push_checked(&mut entry, &mut entries, elem_type)?;
if nested {
return Ok((Value::Array(entries), i));
}
}
}
'"' if !escaped => {
if quote {
// end of quoted string, so push it manually without any checks
// for emptiness or nulls
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry.clear();
}
quote = !quote;
}
',' if !quote => {
push_checked(&mut entry, &mut entries, elem_type)?;
}
_ => {
entry.push(c);
}
}
}
let Some(next) = pg_array.strip_prefix('}') else {
// missing `}` terminator.
if level != 0 {
return Err(JsonConversionError::UnbalancedArray);
};
// whitespace might follow a `}`.
Ok(next.trim_start())
}
/// reads a single item from the `pg_array` string.
/// returns the rest of the `pg_array` string that was not read.
///
/// `quoted` is a scratch allocation that has no defined output.
fn pg_array_parse_item<'a>(
output: &mut Value,
quoted: &mut String,
mut pg_array: &'a str,
elem: &Type,
delim: char,
) -> Result<&'a str, JsonConversionError> {
// We are trying to parse an array item.
// This could be a new array, if this is a multi-dimentional array.
// This could be a quoted string representing `elem`.
// This could be an unquoted string representing `elem`.
// whitespace might preceed an item.
pg_array = pg_array.trim_start();
if pg_array.strip_prefix('{').is_some() {
// nested array.
let mut nested = vec![];
pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?;
*output = Value::Array(nested);
return Ok(pg_array);
}
if let Some(mut pg_array) = pg_array.strip_prefix('"') {
pg_array = pg_array_parse_quoted(quoted, pg_array)?;
// we have unquoted an item string:
pg_text_to_json(output, quoted, elem)?;
quoted.clear();
return Ok(pg_array);
}
// we need to parse an item. read until we find a delimiter or `}`.
let index = pg_array
.find([delim, '}'])
.ok_or(JsonConversionError::UnbalancedArray)?;
let item;
(item, pg_array) = pg_array.split_at(index);
// item might have trailing whitespace that we need to ignore.
let item = item.trim_end();
// we might have an item string:
// check for null
if item == "NULL" {
*output = Value::Null;
} else {
pg_text_to_json(output, item, elem)?;
}
Ok(pg_array)
}
/// reads a single quoted item from the `pg_array` string.
///
/// Returns the rest of the `pg_array` string that was not read.
/// The output is written into `quoted`.
///
/// The pg_array string must have a `"` terminator, but the `"` initial value
/// must have already been removed from the input. The terminator is removed.
fn pg_array_parse_quoted<'a>(
quoted: &mut String,
mut pg_array: &'a str,
) -> Result<&'a str, JsonConversionError> {
// The array output routine will put double quotes around element values if they are empty strings,
// contain curly braces, delimiter characters, double quotes, backslashes, or white space,
// or match the word `NULL`. Double quotes and backslashes embedded in element values will be backslash-escaped.
// For numeric data types it is safe to assume that double quotes will never appear,
// but for textual data types one should be prepared to cope with either the presence or absence of quotes.
// We write to quoted in chunks terminated by an escape character.
// Eg if we have the input `foo\"bar"`, then we write `foo`, then `"`, then finally `bar`.
loop {
// we need to parse an chunk. read until we find a '\\' or `"`.
let i = pg_array
.find(['\\', '"'])
.ok_or(JsonConversionError::UnbalancedString)?;
let chunk: &str;
(chunk, pg_array) = pg_array
.split_at_checked(i)
.expect("i is guaranteed to be in-bounds of pg_array");
// push the chunk.
quoted.push_str(chunk);
// consume the chunk_end character.
let chunk_end: char;
(chunk_end, pg_array) =
split_first_char(pg_array).expect("pg_array should start with either '\\\\' or '\"'");
// finished.
if chunk_end == '"' {
// whitespace might follow the '"'.
pg_array = pg_array.trim_start();
break Ok(pg_array);
}
// consume the escaped character.
let escaped: char;
(escaped, pg_array) =
split_first_char(pg_array).ok_or(JsonConversionError::UnbalancedString)?;
quoted.push(escaped);
}
}
fn split_first_char(s: &str) -> Option<(char, &str)> {
let mut chars = s.chars();
let c = chars.next()?;
Some((c, chars.as_str()))
Ok((Value::Array(entries), 0))
}
#[cfg(test)]
@@ -438,33 +316,37 @@ mod tests {
);
}
fn pg_text_to_json(val: &str, pg_type: &Type) -> Value {
let mut v = Value::Null;
super::pg_text_to_json(&mut v, val, pg_type).unwrap();
v
}
fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value {
let mut array = vec![];
super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap();
Value::Array(array)
}
#[test]
fn test_atomic_types_parse() {
assert_eq!(pg_text_to_json("foo", &Type::TEXT), json!("foo"));
assert_eq!(pg_text_to_json("42", &Type::INT4), json!(42));
assert_eq!(pg_text_to_json("42", &Type::INT2), json!(42));
assert_eq!(pg_text_to_json("42", &Type::INT8), json!("42"));
assert_eq!(pg_text_to_json("42.42", &Type::FLOAT8), json!(42.42));
assert_eq!(pg_text_to_json("42.42", &Type::FLOAT4), json!(42.42));
assert_eq!(pg_text_to_json("NaN", &Type::FLOAT4), json!("NaN"));
assert_eq!(
pg_text_to_json("Infinity", &Type::FLOAT4),
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
json!("foo")
);
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
assert_eq!(
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
json!("42")
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
json!("NaN")
);
assert_eq!(
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
json!("Infinity")
);
assert_eq!(
pg_text_to_json("-Infinity", &Type::FLOAT4),
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
json!("-Infinity")
);
@@ -473,9 +355,10 @@ mod tests {
.unwrap();
assert_eq!(
pg_text_to_json(
r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#,
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
&Type::JSONB
),
)
.unwrap(),
json
);
}
@@ -483,7 +366,7 @@ mod tests {
#[test]
fn test_pg_array_parse_text() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::TEXT)
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
}
assert_eq!(
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
@@ -506,7 +389,7 @@ mod tests {
#[test]
fn test_pg_array_parse_bool() {
fn pb(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::BOOL)
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
}
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
@@ -523,7 +406,7 @@ mod tests {
#[test]
fn test_pg_array_parse_numbers() {
fn pn(pg_arr: &str, ty: &Type) -> Value {
pg_array_parse(pg_arr, ty)
pg_array_parse(pg_arr, ty).unwrap()
}
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
@@ -551,7 +434,7 @@ mod tests {
#[test]
fn test_pg_array_with_decoration() {
fn p(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::INT2)
pg_array_parse(pg_arr, &Type::INT2).unwrap()
}
assert_eq!(
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
@@ -562,7 +445,7 @@ mod tests {
#[test]
fn test_pg_array_parse_json() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::JSONB)
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
}
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
assert_eq!(

View File

@@ -1102,6 +1102,7 @@ async fn query_to_json<T: GenericClient>(
let columns_len = row_stream.statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut types = Vec::with_capacity(columns_len);
for c in row_stream.statement.columns() {
fields.push(json!({
@@ -1113,6 +1114,8 @@ async fn query_to_json<T: GenericClient>(
"dataTypeModifier": c.type_modifier(),
"format": "text",
}));
types.push(c.type_().clone());
}
let raw_output = parsed_headers.raw_output;
@@ -1134,7 +1137,7 @@ async fn query_to_json<T: GenericClient>(
));
}
let row = pg_text_row_to_json(&row, raw_output, array_mode)?;
let row = pg_text_row_to_json(&row, &types, raw_output, array_mode)?;
rows.push(row);
// assumption: parsing pg text and converting to json takes CPU time.

View File

@@ -87,6 +87,7 @@ impl WalProposer {
let config = Config {
ttid,
safekeepers_list: addrs,
safekeeper_conninfo_options: String::new(),
safekeeper_reconnect_timeout: 1000,
safekeeper_connection_timeout: 5000,
sync_safekeepers,

View File

@@ -3823,6 +3823,13 @@ impl Service {
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let is_import = create_req.is_import();
let read_only = matches!(
create_req.mode,
models::TimelineCreateRequestMode::Branch {
read_only: true,
..
}
);
if is_import {
// Ensure that there is no split on-going.
@@ -3895,13 +3902,13 @@ impl Service {
}
None
} else if safekeepers {
} else if safekeepers || read_only {
// Note that for imported timelines, we do not create the timeline on the safekeepers
// straight away. Instead, we do it once the import finalized such that we know what
// start LSN to provide for the safekeepers. This is done in
// [`Self::finalize_timeline_import`].
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
.await?;
Some(res)
@@ -3915,6 +3922,11 @@ impl Service {
})
}
#[instrument(skip_all, fields(
tenant_id=%req.tenant_shard_id.tenant_id,
shard_id=%req.tenant_shard_id.shard_slug(),
timeline_id=%req.timeline_id,
))]
pub(crate) async fn handle_timeline_shard_import_progress(
self: &Arc<Self>,
req: TimelineImportStatusRequest,
@@ -3964,6 +3976,11 @@ impl Service {
})
}
#[instrument(skip_all, fields(
tenant_id=%req.tenant_shard_id.tenant_id,
shard_id=%req.tenant_shard_id.shard_slug(),
timeline_id=%req.timeline_id,
))]
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
self: &Arc<Self>,
req: PutTimelineImportStatusRequest,

View File

@@ -208,6 +208,7 @@ impl Service {
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
read_only: bool,
) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version * 10000;
@@ -220,7 +221,11 @@ impl Service {
let start_lsn = timeline_info.last_record_lsn;
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks = if !read_only {
self.safekeepers_for_new_timeline().await?
} else {
Vec::new()
};
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
// Add timeline to db
let mut timeline_persist = TimelinePersistence {
@@ -253,6 +258,16 @@ impl Service {
)));
}
}
let ret = SafekeepersInfo {
generation: timeline_persist.generation as u32,
safekeepers: sks.clone(),
tenant_id,
timeline_id,
};
if read_only {
return Ok(ret);
}
// Create the timeline on a quorum of safekeepers
let remaining = self
.tenant_timeline_create_safekeepers_quorum(
@@ -316,12 +331,7 @@ impl Service {
}
}
Ok(SafekeepersInfo {
generation: timeline_persist.generation as u32,
safekeepers: sks,
tenant_id,
timeline_id,
})
Ok(ret)
}
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
@@ -336,8 +346,10 @@ impl Service {
return Err(TimelineImportFinalizeError::ShuttingDown);
}
// This function is only used in non-read-only scenarios
let read_only = false;
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
.await;
match res {
@@ -410,6 +422,18 @@ impl Service {
.chain(tl.sk_set.iter())
.collect::<HashSet<_>>();
// The timeline has no safekeepers: we need to delete it from the db manually,
// as no safekeeper reconciler will get to it
if all_sks.is_empty() {
if let Err(err) = self
.persistence
.delete_timeline(tenant_id, timeline_id)
.await
{
tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
}
}
// Schedule reconciliations
for &sk_id in all_sks.iter() {
let pending_op = TimelinePendingOpPersistence {

View File

@@ -404,6 +404,29 @@ class PageserverTracingConfig:
return ("tracing", value)
@dataclass
class PageserverImportConfig:
import_job_concurrency: int
import_job_soft_size_limit: int
import_job_checkpoint_threshold: int
@staticmethod
def default() -> PageserverImportConfig:
return PageserverImportConfig(
import_job_concurrency=4,
import_job_soft_size_limit=512 * 1024,
import_job_checkpoint_threshold=4,
)
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
value = {
"import_job_concurrency": self.import_job_concurrency,
"import_job_soft_size_limit": self.import_job_soft_size_limit,
"import_job_checkpoint_threshold": self.import_job_checkpoint_threshold,
}
return ("timeline_import_config", value)
class NeonEnvBuilder:
"""
Builder object to create a Neon runtime environment
@@ -454,6 +477,7 @@ class NeonEnvBuilder:
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
pageserver_get_vectored_concurrent_io: str | None = None,
pageserver_tracing_config: PageserverTracingConfig | None = None,
pageserver_import_config: PageserverImportConfig | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -511,6 +535,7 @@ class NeonEnvBuilder:
)
self.pageserver_tracing_config = pageserver_tracing_config
self.pageserver_import_config = pageserver_import_config
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm
@@ -1179,6 +1204,10 @@ class NeonEnv:
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
self.pageserver_tracing_config = config.pageserver_tracing_config
if config.pageserver_import_config is None:
self.pageserver_import_config = PageserverImportConfig.default()
else:
self.pageserver_import_config = config.pageserver_import_config
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1258,12 +1287,6 @@ class NeonEnv:
"no_sync": True,
# Look for gaps in WAL received from safekeepeers
"validate_wal_contiguity": True,
# TODO(vlad): make these configurable through the builder
"timeline_import_config": {
"import_job_concurrency": 4,
"import_job_soft_size_limit": 512 * 1024,
"import_job_checkpoint_threshold": 4,
},
}
# Batching (https://github.com/neondatabase/neon/issues/9377):
@@ -1325,6 +1348,12 @@ class NeonEnv:
ps_cfg[key] = value
if self.pageserver_import_config is not None:
key, value = self.pageserver_import_config.to_config_key_value()
if key not in ps_cfg:
ps_cfg[key] = value
# Create a corresponding NeonPageserver object
ps = NeonPageserver(
self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]

View File

@@ -1,7 +1,10 @@
import base64
import concurrent.futures
import json
import random
import threading
import time
from enum import Enum
from enum import Enum, StrEnum
from pathlib import Path
from threading import Event
@@ -11,7 +14,14 @@ import pytest
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.fast_import import FastImport
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PageserverImportConfig,
PgBin,
PgProtocol,
StorageControllerMigrationConfig,
VanillaPostgres,
)
from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
)
@@ -494,6 +504,259 @@ def test_import_respects_tenant_shutdown(
wait_until(cplane_notified)
@skip_in_debug_build("Validation query takes too long in debug builds")
def test_import_chaos(
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
):
"""
Perform a timeline import while injecting chaos in the environment.
We expect that the import completes eventually, that it passes validation and
the resulting timeline can be written to.
"""
TARGET_RELBOCK_SIZE = 512 * 1024 * 1024 # 512 MiB
ALLOWED_IMPORT_RUNTIME = 90 # seconds
SHARD_COUNT = 4
neon_env_builder.num_pageservers = SHARD_COUNT
neon_env_builder.pageserver_import_config = PageserverImportConfig(
import_job_concurrency=1,
import_job_soft_size_limit=64 * 1024,
import_job_checkpoint_threshold=4,
)
# Set up mock control plane HTTP server to listen for import completions
import_completion_signaled = Event()
# There's some Python magic at play here. A list can be updated from the
# handler thread, but an optional cannot. Hence, use a list with one element.
import_error = []
def handler(request: Request) -> Response:
assert request.json is not None
body = request.json
if "error" in body:
if body["error"]:
import_error.append(body["error"])
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
# Plug the cplane mock in
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
# The import will specifiy a local filesystem path mocking remote storage
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
nrows = 0
while True:
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
log.info(
f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages"
)
if relblock_size >= TARGET_RELBOCK_SIZE:
break
addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192)
assert addrows >= 1, "forward progress"
vanilla_pg.safe_psql(
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
)
nrows += addrows
vanilla_pg.stop()
env = neon_env_builder.init_configs()
env.start()
# Pause after every import task to extend the test runtime and allow
# for more chaos injection.
for ps in env.pageservers:
ps.add_persistent_failpoint("import-task-complete-pausable", "sleep(5)")
env.storage_controller.allowed_errors.extend(
[
# The shard might have moved or the pageserver hosting the shard restarted
".*Call to node.*management API.*failed.*",
# Migrations have their time outs set to 0
".*Timed out after.*downloading layers.*",
".*Failed to prepare by downloading layers.*",
# The test may kill the storage controller or pageservers
".*request was dropped before completing.*",
]
)
for ps in env.pageservers:
ps.allowed_errors.extend(
[
# We might re-write a layer in a different generation if the import
# needs to redo some of the progress since not each job is checkpointed.
".*was unlinked but was not dangling.*",
# The test may kill the storage controller or pageservers
".*request was dropped before completing.*",
# Test can SIGTERM pageserver while it is downloading
".*removing local file.*temp_download.*",
".*Failed to flush heatmap.*",
# Test can SIGTERM the storage controller while pageserver
# is attempting to upcall.
".*storage controller upcall failed.*timeline_import_status.*",
# TODO(vlad): TenantManager::reset_tenant returns a blanked anyhow error.
# It should return ResourceUnavailable or something that doesn't error log.
".*activate_post_import.*InternalServerError.*tenant map is shutting down.*",
# TODO(vlad): How can this happen?
".*Failed to download a remote file: deserialize index part file.*",
".*Cancelled request finished with an error.*",
]
)
importbucket_path = neon_env_builder.repo_dir / "test_import_chaos_bucket"
mock_import_bucket(vanilla_pg, importbucket_path)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
idempotency = ImportPgdataIdemptencyKey.random()
env.storage_controller.tenant_create(
tenant_id, shard_count=SHARD_COUNT, placement_policy={"Attached": 1}
)
env.storage_controller.reconcile_until_idle()
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
def chaos(stop_chaos: threading.Event):
class ChaosType(StrEnum):
MIGRATE_SHARD = "migrate_shard"
RESTART_IMMEDIATE = "restart_immediate"
RESTART = "restart"
STORCON_RESTART_IMMEDIATE = "storcon_restart_immediate"
while not stop_chaos.is_set():
chaos_type = random.choices(
population=[
ChaosType.MIGRATE_SHARD,
ChaosType.RESTART,
ChaosType.RESTART_IMMEDIATE,
ChaosType.STORCON_RESTART_IMMEDIATE,
],
weights=[0.25, 0.25, 0.25, 0.25],
k=1,
)[0]
try:
if chaos_type == ChaosType.MIGRATE_SHARD:
target_shard_number = random.randint(0, SHARD_COUNT - 1)
target_shard = TenantShardId(tenant_id, target_shard_number, SHARD_COUNT)
placements = env.storage_controller.get_tenants_placement()
log.info(f"{placements=}")
target_ps = placements[str(target_shard)]["intent"]["attached"]
if len(placements[str(target_shard)]["intent"]["secondary"]) == 0:
dest_ps = None
else:
dest_ps = placements[str(target_shard)]["intent"]["secondary"][0]
if target_ps is None or dest_ps is None:
continue
config = StorageControllerMigrationConfig(
secondary_warmup_timeout="0s",
secondary_download_request_timeout="0s",
prewarm=False,
)
env.storage_controller.tenant_shard_migrate(target_shard, dest_ps, config)
log.info(
f"CHAOS: Migrating shard {target_shard} from pageserver {target_ps} to {dest_ps}"
)
elif chaos_type == ChaosType.RESTART_IMMEDIATE:
target_ps = random.choice(env.pageservers)
log.info(f"CHAOS: Immediate restart of pageserver {target_ps.id}")
target_ps.stop(immediate=True)
target_ps.start()
elif chaos_type == ChaosType.RESTART:
target_ps = random.choice(env.pageservers)
log.info(f"CHAOS: Normal restart of pageserver {target_ps.id}")
target_ps.stop(immediate=False)
target_ps.start()
elif chaos_type == ChaosType.STORCON_RESTART_IMMEDIATE:
log.info("CHAOS: Immediate restart of storage controller")
env.storage_controller.stop(immediate=True)
env.storage_controller.start()
except Exception as e:
log.warning(f"CHAOS: Error during chaos operation {chaos_type}: {e}")
# Sleep before next chaos event
time.sleep(1)
log.info("Chaos injector stopped")
def wait_for_import_completion():
start = time.time()
done = import_completion_signaled.wait(ALLOWED_IMPORT_RUNTIME)
if not done:
raise TimeoutError(f"Import did not signal completion within {ALLOWED_IMPORT_RUNTIME}")
end = time.time()
log.info(f"Import completion signalled after {end - start}s {import_error=}")
if import_error:
raise RuntimeError(f"Import error: {import_error}")
with concurrent.futures.ThreadPoolExecutor() as executor:
stop_chaos = threading.Event()
wait_for_import_completion_fut = executor.submit(wait_for_import_completion)
chaos_fut = executor.submit(chaos, stop_chaos)
try:
wait_for_import_completion_fut.result()
except Exception as e:
raise e
finally:
stop_chaos.set()
chaos_fut.result()
import_branch_name = "imported"
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
# Validate the imported data is legit
assert endpoint.safe_psql_many(
[
"set effective_io_concurrency=32;",
"SET statement_timeout='300s';",
"select count(*), sum(data::bigint)::bigint from t",
]
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
endpoint.stop()
# Validate writes
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
workload.init()
workload.write_rows(64)
workload.validate()
def test_fast_import_with_pageserver_ingest(
test_output_dir,
vanilla_pg: VanillaPostgres,

View File

@@ -20,6 +20,9 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.utils import query_scalar, wait_until
@pytest.mark.skip(
reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548"
)
@pytest.mark.parametrize(
"attach_mode",
["default_generation", "same_generation"],

View File

@@ -4158,17 +4158,12 @@ def test_storcon_create_delete_sk_down(
env.storage_controller.stop()
env.storage_controller.start()
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
with env.endpoints.create("main", tenant_id=tenant_id) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
@@ -4249,17 +4244,12 @@ def test_storcon_few_sk(
env.safekeepers[0].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
with env.endpoints.create("main", tenant_id=tenant_id) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")

View File

@@ -10,6 +10,7 @@ from queue import Empty, Queue
from threading import Barrier
import pytest
import requests
from fixtures.common_types import Lsn, TimelineArchivalState, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
@@ -401,8 +402,25 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots
"earlier", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_pipe
)
snapshot_branchpoint_old = env.create_branch(
"snapshot_branchpoint_old", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_y
snapshot_branchpoint_old = TimelineId.generate()
env.storage_controller.timeline_create(
env.initial_tenant,
{
"new_timeline_id": str(snapshot_branchpoint_old),
"ancestor_start_lsn": str(branchpoint_y),
"ancestor_timeline_id": str(env.initial_timeline),
"read_only": True,
},
)
sk = env.safekeepers[0]
assert sk
with pytest.raises(requests.exceptions.HTTPError, match="Not Found"):
sk.http_client().timeline_status(
tenant_id=env.initial_tenant, timeline_id=snapshot_branchpoint_old
)
env.neon_cli.mappings_map_branch(
"snapshot_branchpoint_old", env.initial_tenant, snapshot_branchpoint_old
)
snapshot_branchpoint = env.create_branch(

View File

@@ -2012,10 +2012,7 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create("main", config_lines=config_lines)
ep = env.endpoints.create("main")
# expected to fail because timeline is not created on safekeepers
with pytest.raises(Exception, match=r".*timed out.*"):
@@ -2043,10 +2040,7 @@ def test_explicit_timeline_creation_storcon(neon_env_builder: NeonEnvBuilder):
}
env = neon_env_builder.init_start()
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create("main", config_lines=config_lines)
ep = env.endpoints.create("main")
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])

View File

@@ -637,10 +637,7 @@ async def quorum_sanity_single(
# create timeline on `members_sks`
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks)
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create(branch_name, config_lines=config_lines)
ep = env.endpoints.create(branch_name)
ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids)
ep.safe_psql("create table t(key int, value text)")

View File

@@ -41,10 +41,8 @@ env_logger = { version = "0.11" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
form_urlencoded = { version = "1" }
futures-channel = { version = "0.3", features = ["sink"] }
futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-io = { version = "0.3" }
futures-task = { version = "0.3", default-features = false, features = ["std"] }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
@@ -74,7 +72,6 @@ num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
percent-encoding = { version = "2" }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }