Compare commits

...

17 Commits

Author SHA1 Message Date
Dmitrii Kovalkov
c53b4545c8 rename cancel -> cancel_new_retries 2025-07-29 12:11:47 +04:00
Dmitrii Kovalkov
e48ac9ed76 Merge branch 'main' into diko/safekeeper_migrate_from_down_sk 2025-07-23 15:15:17 +04:00
Dmitrii Kovalkov
94cb9a79d9 safekeeper: generation aware timeline tombstones (#12482)
## Problem
With safekeeper migration in mind, we can now pull/exclude the timeline
multiple times within the same safekeeper. To avoid races between out of
order requests, we need to ignore the pull/exclude requests if we have
already seen a higher generation.

- Closes: https://github.com/neondatabase/neon/issues/12186
- Closes: [LKB-949](https://databricks.atlassian.net/browse/LKB-949)

## Summary of changes
- Annotate timeline tombstones in safekeeper with request generation.
- Replace `ignore_tombstone` option with `mconf` in
`PullTimelineRequest`
- Switch membership in `pull_timeline` if the existing/pulled timeline
has an older generation.
- Refuse to switch membership if the timeline is being deleted
(`is_canceled`).
- Refuse to switch membership in compute greeting request if the
safekeeper is not a member of `mconf`.
- Pass `mconf` in `PullTimelineRequest` in safekeeper_service

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-07-23 11:01:04 +00:00
Dmitrii Kovalkov
961835add6 storcon: do not retry sk migration ops if the quorum is reached 2025-07-23 13:38:32 +04:00
Tristan Partin
fc242afcc2 PG ignore PageserverFeedback from unknown shards (#12671)
## Problem
When testing tenant splits, I found that PG can get backpressure
throttled indefinitely if the split is aborted afterwards. It turns out
that each PageServer activates new shard separately even before the
split is committed and they may start sending PageserverFeedback to PG
directly. As a result, if the split is aborted, no one resets the
pageserver feedback in PG, and thus PG will be backpressure throttled
forever unless it's restarted manually.

## Summary of changes
This PR fixes this problem by having
`walprop_pg_process_safekeeper_feedback` simply ignore all pageserver
feedback from unknown shards. The source of truth here is defined by the
shard map, which is guaranteed to be reloaded only after the split is
committed.

Co-authored-by: Chen Luo <chen.luo@databricks.com>
2025-07-22 21:41:56 +00:00
Suhas Thalanki
e275221aef add hadron-specific metrics (#12686) 2025-07-22 21:17:45 +00:00
Alex Chi Z.
f859354466 feat(pageserver): add db rel count as feature flag property (#12632)
## Problem

As part of the reldirv2 rollout: LKB-197.


We will use number of db/rels as a criteria whether to rollout reldirv2
directly on the write path (simplest and easiest way of rollout). If the
number of rel/db is small then it shouldn't take too long time on the
write path.

## Summary of changes

* Compute db/rel count during basebackup.
* Also compute it during logical size computation.
* Collect maximum number of db/rel across all timelines in the feature
flag propeties.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-22 17:55:07 +00:00
Konstantin Knizhnik
b00a0096bf Reintialize page in allocNewBuffer only when buffer is returned (#12399)
## Problem

See https://github.com/neondatabase/neon/issues/12387

`allocNewBuffer` initialise page with zeros 
but not always return it because of parity checks.
In case of wrong parity the page is rejected and as a result we have
dirty page with zero LSN, which cause assertion failure on neon_write
when page is evicted from shared buffers.

## Summary of changes

Perform, page initialisation in `allocNewBuffer` only when buffer is
returned (parity check is passed).

Postgres PRs:
https://github.com/neondatabase/postgres/pull/661
https://github.com/neondatabase/postgres/pull/662
https://github.com/neondatabase/postgres/pull/663
https://github.com/neondatabase/postgres/pull/664

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Kosntantin Knizhnik <konstantin.knizhnik@databricks.com>
2025-07-22 17:50:26 +00:00
a-masterov
b3844903e5 Add new operations to Random operations test (#12213)
## Problem
We did not test some Public API calls, such as using a timestamp to
create a branch, reset_to_parent.
## Summary of changes
Tests now include some other operations: reset_to_parent, a branch
creation from any time in the past, etc.
Currently, the API calls are only exposed; the semantics are not
verified.

---------

Co-authored-by: Alexey Masterov <alexey.masterov@databricks.com>
2025-07-22 17:43:01 +00:00
Vlad Lazar
5b0972151c pageserver: silence shard resolution warning (#12685)
## Problem

We drive the get page requests that have started processing to
completion. So in the case when the compute received a reconfiguration
request and the old connection has a request procesing on the
pageserver, we are going to issue the warning.

I spot checked a few instances of the warning and in all cases the
compute was already connected to the correct pageserver.

## Summary of Changes

Downgrade to INFO. It would be nice to somehow figure out if the
connection has been terminated in the meantime, but the terminate libpq
message is still in the pipe while we're doing the shard resolution.

Closes LKB-2381
2025-07-22 17:34:23 +00:00
Heikki Linnakangas
51ffeef93f Fix postgres version compatibility macros (#12658)
The argument to BufTagInit was called 'spcOid', and it was also setting
a field called 'spcOid'. The field name would erroneously also be
expanded with the macro arg. It happened to work so far, because all the
users of the macro pass a variable called 'spcOid' for the 'spcOid'
argument, but as soon as you try to pass anything else, it fails. And
same story for 'dbOid' and 'relNumber'. Rename the arguments to avoid
the name collision.

Also while we're at it, add parens around the arguments in a few macros,
to make them safer if you pass something non-trivial as the argument.
2025-07-22 16:52:57 +00:00
Erik Grinaker
0fe07dec32 test_runner: allow stuck reconciliation errors (#12682)
This log message was added in #12589.

During chaos tests, reconciles may not succeed for some time, triggering
the log message.

Resolves [LKB-2467](https://databricks.atlassian.net/browse/LKB-2467).
2025-07-22 16:43:35 +00:00
HaoyuHuang
8de320ab9b Add a few compute_tool changes (#12677)
## Summary of changes
All changes are no-op.
2025-07-22 16:22:18 +00:00
Folke Behrens
108f7ec544 Bump opentelemetry crates to 0.30 (#12680)
This rebuilds #11552 on top the current Cargo.lock.

---------

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>
2025-07-22 16:05:35 +00:00
Tristan Partin
63d2b1844d Fix final pyright issues with neon_api.py (#8476)
Fix final pyright issues with neon_api.py

Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
2025-07-22 16:04:52 +00:00
Dmitrii Kovalkov
133f16e9b5 storcon: finish safekeeper migration gracefully (#12528)
## Problem
We don't detect if safekeeper migration fails after the the commiting
the membership configuration to the database. As a result, we might
leave stale timelines on excluded safekeepers and do not notify
cplane/safekepeers about new configuration.

- Implements solution proposed in
https://github.com/neondatabase/neon/pull/12432
- Closes: https://github.com/neondatabase/neon/issues/12192
- Closes: [LKB-944](https://databricks.atlassian.net/browse/LKB-944)

## Summary of changes
- Add `sk_set_notified_generation` column to `timelines` database
- Update `*_notified_generation` in database during the finish state.
- Commit reconciliation requests to database atomically with membership
configuration.
- Reload pending ops and retry "finish" step if we detect
`*_notified_generation` mismatch.
- Add failpoints and test that we handle failures well
2025-07-22 14:58:20 +00:00
Alex Chi Z.
88391ce069 feat(pageserver): create image layers at L0-L1 boundary by default (#12669)
## Problem

Post LKB-198 rollout. We added a new strategy to generate image layers
at the L0-L1 boundary instead of the latest LSN to ensure too many L0
layers do not trigger image layer creation.

## Summary of changes

We already rolled it out to all users so we can remove the feature flag
now.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-22 14:29:26 +00:00
58 changed files with 1593 additions and 479 deletions

171
Cargo.lock generated
View File

@@ -1097,7 +1097,7 @@ checksum = "975982cdb7ad6a142be15bdf84aea7ec6a9e5d4d797c004d43185b24cfe4e684"
dependencies = [
"clap",
"heck 0.5.0",
"indexmap 2.9.0",
"indexmap 2.10.0",
"log",
"proc-macro2",
"quote",
@@ -1313,7 +1313,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"indexmap 2.9.0",
"indexmap 2.10.0",
"jsonwebtoken",
"regex",
"remote_storage",
@@ -1350,7 +1350,7 @@ dependencies = [
"http-body-util",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"indexmap 2.10.0",
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
@@ -1383,7 +1383,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"tonic 0.13.1",
"tonic",
"tower 0.5.2",
"tower-http",
"tower-otel",
@@ -2659,7 +2659,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 0.2.9",
"indexmap 2.9.0",
"indexmap 2.10.0",
"slab",
"tokio",
"tokio-util",
@@ -2678,7 +2678,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 1.3.1",
"indexmap 2.9.0",
"indexmap 2.10.0",
"slab",
"tokio",
"tokio-util",
@@ -2937,7 +2937,7 @@ dependencies = [
"pprof",
"regex",
"routerify",
"rustls 0.23.27",
"rustls 0.23.29",
"rustls-pemfile 2.1.1",
"serde",
"serde_json",
@@ -3274,9 +3274,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.9.0"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
@@ -3302,7 +3302,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
dependencies = [
"ahash",
"indexmap 2.9.0",
"indexmap 2.10.0",
"is-terminal",
"itoa",
"log",
@@ -3325,7 +3325,7 @@ dependencies = [
"crossbeam-utils",
"dashmap 6.1.0",
"env_logger",
"indexmap 2.9.0",
"indexmap 2.10.0",
"itoa",
"log",
"num-format",
@@ -4162,23 +4162,23 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "opentelemetry"
version = "0.27.1"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7"
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
dependencies = [
"futures-core",
"futures-sink",
"js-sys",
"pin-project-lite",
"thiserror 1.0.69",
"thiserror 2.0.11",
"tracing",
]
[[package]]
name = "opentelemetry-http"
version = "0.27.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80"
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
dependencies = [
"async-trait",
"bytes",
@@ -4189,12 +4189,10 @@ dependencies = [
[[package]]
name = "opentelemetry-otlp"
version = "0.27.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76"
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
dependencies = [
"async-trait",
"futures-core",
"http 1.3.1",
"opentelemetry",
"opentelemetry-http",
@@ -4202,46 +4200,43 @@ dependencies = [
"opentelemetry_sdk",
"prost 0.13.5",
"reqwest",
"thiserror 1.0.69",
"thiserror 2.0.11",
]
[[package]]
name = "opentelemetry-proto"
version = "0.27.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6"
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost 0.13.5",
"tonic 0.12.3",
"tonic",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.27.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52"
checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2"
[[package]]
name = "opentelemetry_sdk"
version = "0.27.1"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8"
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
dependencies = [
"async-trait",
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"opentelemetry",
"percent-encoding",
"rand 0.8.5",
"rand 0.9.1",
"serde_json",
"thiserror 1.0.69",
"thiserror 2.0.11",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
@@ -4368,7 +4363,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.13.1",
"tonic",
"tracing",
"url",
"utils",
@@ -4465,7 +4460,7 @@ dependencies = [
"reqwest",
"rpds",
"rstest",
"rustls 0.23.27",
"rustls 0.23.29",
"scopeguard",
"send-future",
"serde",
@@ -4489,7 +4484,7 @@ dependencies = [
"tokio-tar",
"tokio-util",
"toml_edit",
"tonic 0.13.1",
"tonic",
"tonic-reflection",
"tower 0.5.2",
"tracing",
@@ -4575,7 +4570,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.13.1",
"tonic",
"tracing",
"utils",
"workspace_hack",
@@ -4620,7 +4615,7 @@ dependencies = [
"thiserror 1.0.69",
"tokio",
"tokio-util",
"tonic 0.13.1",
"tonic",
"tonic-build",
"utils",
"workspace_hack",
@@ -5002,7 +4997,7 @@ dependencies = [
"bytes",
"once_cell",
"pq_proto",
"rustls 0.23.27",
"rustls 0.23.29",
"rustls-pemfile 2.1.1",
"serde",
"thiserror 1.0.69",
@@ -5401,7 +5396,7 @@ dependencies = [
"hyper 0.14.30",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"indexmap 2.10.0",
"ipnet",
"itertools 0.10.5",
"itoa",
@@ -5438,7 +5433,7 @@ dependencies = [
"rsa",
"rstest",
"rustc-hash 2.1.1",
"rustls 0.23.27",
"rustls 0.23.29",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
"scopeguard",
@@ -5717,7 +5712,7 @@ dependencies = [
"num-bigint",
"percent-encoding",
"pin-project-lite",
"rustls 0.23.27",
"rustls 0.23.29",
"rustls-native-certs 0.8.0",
"ryu",
"sha1_smol",
@@ -5946,9 +5941,9 @@ dependencies = [
[[package]]
name = "reqwest-tracing"
version = "0.5.5"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73e6153390585f6961341b50e5a1931d6be6dee4292283635903c26ef9d980d2"
checksum = "d70ea85f131b2ee9874f0b160ac5976f8af75f3c9badfe0d955880257d10bd83"
dependencies = [
"anyhow",
"async-trait",
@@ -6173,15 +6168,15 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.27"
version = "0.23.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321"
checksum = "2491382039b29b9b11ff08b76ff6c97cf287671dbb74f0be44bda389fffe9bd1"
dependencies = [
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki 0.103.3",
"rustls-webpki 0.103.4",
"subtle",
"zeroize",
]
@@ -6245,9 +6240,12 @@ dependencies = [
[[package]]
name = "rustls-pki-types"
version = "1.11.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79"
dependencies = [
"zeroize",
]
[[package]]
name = "rustls-webpki"
@@ -6272,9 +6270,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.103.3"
version = "0.103.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435"
checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc"
dependencies = [
"ring",
"rustls-pki-types",
@@ -6335,7 +6333,7 @@ dependencies = [
"regex",
"remote_storage",
"reqwest",
"rustls 0.23.27",
"rustls 0.23.29",
"safekeeper_api",
"safekeeper_client",
"scopeguard",
@@ -6525,7 +6523,7 @@ checksum = "255914a8e53822abd946e2ce8baa41d4cded6b8e938913b7f7b9da5b7ab44335"
dependencies = [
"httpdate",
"reqwest",
"rustls 0.23.27",
"rustls 0.23.29",
"sentry-backtrace",
"sentry-contexts",
"sentry-core",
@@ -6657,7 +6655,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
dependencies = [
"form_urlencoded",
"indexmap 2.9.0",
"indexmap 2.10.0",
"itoa",
"ryu",
"serde",
@@ -6738,7 +6736,7 @@ dependencies = [
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.9.0",
"indexmap 2.10.0",
"serde",
"serde_derive",
"serde_json",
@@ -6981,10 +6979,10 @@ dependencies = [
"once_cell",
"parking_lot 0.12.1",
"prost 0.13.5",
"rustls 0.23.27",
"rustls 0.23.29",
"tokio",
"tokio-rustls 0.26.2",
"tonic 0.13.1",
"tonic",
"tonic-build",
"tracing",
"utils",
@@ -7029,7 +7027,7 @@ dependencies = [
"regex",
"reqwest",
"routerify",
"rustls 0.23.27",
"rustls 0.23.29",
"rustls-native-certs 0.8.0",
"safekeeper_api",
"safekeeper_client",
@@ -7083,7 +7081,7 @@ dependencies = [
"postgres_ffi",
"remote_storage",
"reqwest",
"rustls 0.23.27",
"rustls 0.23.29",
"rustls-native-certs 0.8.0",
"serde",
"serde_json",
@@ -7622,7 +7620,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab"
dependencies = [
"ring",
"rustls 0.23.27",
"rustls 0.23.29",
"tokio",
"tokio-postgres",
"tokio-rustls 0.26.2",
@@ -7673,7 +7671,7 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [
"rustls 0.23.27",
"rustls 0.23.29",
"tokio",
]
@@ -7772,34 +7770,13 @@ version = "0.22.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38"
dependencies = [
"indexmap 2.9.0",
"indexmap 2.10.0",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
]
[[package]]
name = "tonic"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"http 1.3.1",
"http-body 1.0.0",
"http-body-util",
"percent-encoding",
"pin-project",
"prost 0.13.5",
"tokio-stream",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic"
version = "0.13.1"
@@ -7857,7 +7834,7 @@ dependencies = [
"prost-types 0.13.5",
"tokio",
"tokio-stream",
"tonic 0.13.1",
"tonic",
]
[[package]]
@@ -7883,7 +7860,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [
"futures-core",
"futures-util",
"indexmap 2.9.0",
"indexmap 2.10.0",
"pin-project-lite",
"slab",
"sync_wrapper 1.0.1",
@@ -7921,10 +7898,14 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-otel"
version = "0.2.0"
source = "git+https://github.com/mattiapenati/tower-otel?rev=56a7321053bcb72443888257b622ba0d43a11fcd#56a7321053bcb72443888257b622ba0d43a11fcd"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "345000ea5ae33222624a8ccfdd88892c30db4d413a39c2d4bd714b77e0a4b23c"
dependencies = [
"axum",
"cfg-if",
"http 1.3.1",
"http-body 1.0.0",
"opentelemetry",
"pin-project",
"tower-layer",
@@ -8006,9 +7987,9 @@ dependencies = [
[[package]]
name = "tracing-opentelemetry"
version = "0.28.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053"
checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c"
dependencies = [
"js-sys",
"once_cell",
@@ -8216,7 +8197,7 @@ dependencies = [
"base64 0.22.1",
"log",
"once_cell",
"rustls 0.23.27",
"rustls 0.23.29",
"rustls-pki-types",
"url",
"webpki-roots",
@@ -8888,7 +8869,7 @@ dependencies = [
"hyper 0.14.30",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"indexmap 2.10.0",
"itertools 0.12.1",
"lazy_static",
"libc",
@@ -8911,14 +8892,14 @@ dependencies = [
"proc-macro2",
"prost 0.13.5",
"quote",
"rand 0.8.5",
"rand 0.9.1",
"regex",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
"reqwest",
"rustls 0.23.27",
"rustls 0.23.29",
"rustls-pki-types",
"rustls-webpki 0.103.3",
"rustls-webpki 0.103.4",
"scopeguard",
"sec1 0.7.3",
"serde",
@@ -8931,6 +8912,7 @@ dependencies = [
"subtle",
"syn 2.0.100",
"sync_wrapper 0.1.2",
"thiserror 2.0.11",
"tikv-jemalloc-ctl",
"tikv-jemalloc-sys",
"time",
@@ -8940,6 +8922,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"toml_edit",
"tonic",
"tower 0.5.2",
"tracing",
"tracing-core",

View File

@@ -143,10 +143,10 @@ notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.19"
once_cell = "1.13"
opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.27"
opentelemetry = "0.30"
opentelemetry_sdk = "0.30"
opentelemetry-otlp = { version = "0.30", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.30"
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
@@ -164,7 +164,7 @@ rand_core = "=0.6"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_30"] }
reqwest-middleware = "0.4"
reqwest-retry = "0.7"
routerify = "3"
@@ -214,15 +214,12 @@ tonic = { version = "0.13.1", default-features = false, features = ["channel", "
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
# This revision uses opentelemetry 0.27. There's no tag for it.
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
tower-otel = { version = "0.6", features = ["axum"] }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.28"
tracing-opentelemetry = "0.31"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"

View File

@@ -138,6 +138,12 @@ struct Cli {
/// Run in development mode, skipping VM-specific operations like process termination
#[arg(long, action = clap::ArgAction::SetTrue)]
pub dev: bool,
#[arg(long)]
pub pg_init_timeout: Option<u64>,
#[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
pub lakebase_mode: bool,
}
impl Cli {
@@ -188,7 +194,7 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
runtime.block_on(init(cli.dev))?;
let tracing_provider = init(cli.dev)?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -219,6 +225,8 @@ fn main() -> Result<()> {
installed_extensions_collection_interval: Arc::new(AtomicU64::new(
cli.installed_extensions_collection_interval,
)),
pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
lakebase_mode: cli.lakebase_mode,
},
config,
)?;
@@ -227,11 +235,11 @@ fn main() -> Result<()> {
scenario.teardown();
deinit_and_exit(exit_code);
deinit_and_exit(tracing_provider, exit_code);
}
async fn init(dev_mode: bool) -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
fn init(dev_mode: bool) -> Result<Option<tracing_utils::Provider>> {
let provider = init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
@@ -242,7 +250,7 @@ async fn init(dev_mode: bool) -> Result<()> {
info!("compute build_tag: {}", &BUILD_TAG.to_string());
Ok(())
Ok(provider)
}
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
@@ -267,25 +275,27 @@ fn get_config(cli: &Cli) -> Result<ComputeConfig> {
}
}
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
tracing_utils::shutdown_tracing();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
fn deinit_and_exit(tracing_provider: Option<tracing_utils::Provider>, exit_code: Option<i32>) -> ! {
if let Some(p) = tracing_provider {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
_ = p.shutdown();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
}
info!("shutting down");

View File

@@ -113,6 +113,11 @@ pub struct ComputeNodeParams {
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: Arc<AtomicU64>,
/// Timeout of PG compute startup in the Init state.
pub pg_init_timeout: Option<Duration>,
pub lakebase_mode: bool,
}
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
@@ -154,6 +159,7 @@ pub struct RemoteExtensionMetrics {
#[derive(Clone, Debug)]
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub pg_start_time: Option<DateTime<Utc>>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity. It could be `None` if
/// compute wasn't used since start.
@@ -191,6 +197,7 @@ impl ComputeState {
pub fn new() -> Self {
Self {
start_time: Utc::now(),
pg_start_time: None,
status: ComputeStatus::Empty,
last_active: None,
error: None,
@@ -648,6 +655,9 @@ impl ComputeNode {
};
_this_entered = start_compute_span.enter();
// Hadron: Record postgres start time (used to enforce pg_init_timeout).
state_guard.pg_start_time.replace(Utc::now());
state_guard.set_status(ComputeStatus::Init, &self.state_changed);
compute_state = state_guard.clone()
}
@@ -1441,7 +1451,7 @@ impl ComputeNode {
})?;
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
update_pg_hba(pgdata_path, None)?;
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
@@ -1746,6 +1756,7 @@ impl ComputeNode {
}
// Run migrations separately to not hold up cold starts
let lakebase_mode = self.params.lakebase_mode;
let params = self.params.clone();
tokio::spawn(async move {
let mut conf = conf.as_ref().clone();
@@ -1758,7 +1769,7 @@ impl ComputeNode {
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(params, &mut client).await {
if let Err(e) = handle_migrations(params, &mut client, lakebase_mode).await {
error!("Failed to run migrations: {}", e);
}
}

View File

@@ -0,0 +1,60 @@
use metrics::{
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
register_int_gauge_vec,
};
use once_cell::sync::Lazy;
// Counter keeping track of the number of PageStream request errors reported by Postgres.
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_pagestream_request_errors_total",
"Number of PageStream request errors reported by the postgres process"
)
.expect("failed to define a metric")
});
// Counter keeping track of the number of compute configuration errors due to Postgres statement
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
// increases by checking PG and PS logs.
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_configure_statement_timeout_errors_total",
"Number of compute configuration errors due to Postgres statement timeouts."
)
.expect("failed to define a metric")
});
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pg_cctl_attached",
"Compute node attached status (1 if attached)",
&[
"pg_compute_id",
"pg_instance_id",
"tenant_id",
"timeline_id"
]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = Vec::new();
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
metrics.extend(COMPUTE_ATTACHED.collect());
metrics
}
pub fn initialize_metrics() {
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
Lazy::force(&COMPUTE_ATTACHED);
}

View File

@@ -16,6 +16,7 @@ pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod hadron_metrics;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;

View File

@@ -13,7 +13,9 @@ use tracing_subscriber::prelude::*;
/// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
/// `tracing-utils` package description.
///
pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
pub fn init_tracing_and_logging(
default_log_level: &str,
) -> anyhow::Result<Option<tracing_utils::Provider>> {
// Initialize Logging
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
@@ -24,8 +26,9 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
.with_writer(std::io::stderr);
// Initialize OpenTelemetry
let otlp_layer =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
let provider =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default());
let otlp_layer = provider.as_ref().map(tracing_utils::layer);
// Put it all together
tracing_subscriber::registry()
@@ -37,7 +40,7 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
Ok(())
Ok(provider)
}
/// Replace all newline characters with a special character to make it

View File

@@ -9,15 +9,20 @@ use crate::metrics::DB_MIGRATION_FAILED;
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
lakebase_mode: bool,
}
impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
Self { client, migrations }
Self {
client,
migrations,
lakebase_mode,
}
}
/// Get the current value neon_migration.migration_id
@@ -130,8 +135,13 @@ impl<'m> MigrationRunner<'m> {
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
let migration = if self.lakebase_mode {
migration.replace("neon_superuser", "databricks_superuser")
} else {
migration.to_string()
};
match Self::run_migration(self.client, migration_id, migration).await {
match Self::run_migration(self.client, migration_id, &migration).await {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}

View File

@@ -11,6 +11,7 @@ use tracing::{Level, error, info, instrument, span};
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const PG_DEFAULT_INIT_TIMEOUIT: Duration = Duration::from_secs(60);
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
/// Struct to store runtime state of the compute monitor thread.
@@ -352,13 +353,47 @@ impl ComputeMonitor {
// Hang on condition variable waiting until the compute status is `Running`.
fn wait_for_postgres_start(compute: &ComputeNode) {
let mut state = compute.state.lock().unwrap();
let pg_init_timeout = compute
.params
.pg_init_timeout
.unwrap_or(PG_DEFAULT_INIT_TIMEOUIT);
while state.status != ComputeStatus::Running {
info!("compute is not running, waiting before monitoring activity");
state = compute.state_changed.wait(state).unwrap();
if !compute.params.lakebase_mode {
state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::Running {
break;
if state.status == ComputeStatus::Running {
break;
}
continue;
}
if state.pg_start_time.is_some()
&& Utc::now()
.signed_duration_since(state.pg_start_time.unwrap())
.to_std()
.unwrap_or_default()
> pg_init_timeout
{
// If Postgres isn't up and running with working PS/SK connections within POSTGRES_STARTUP_TIMEOUT, it is
// possible that we started Postgres with a wrong spec (so it is talking to the wrong PS/SK nodes). To prevent
// deadends we simply exit (panic) the compute node so it can restart with the latest spec.
//
// NB: We skip this check if we have not attempted to start PG yet (indicated by state.pg_start_up == None).
// This is to make sure the more appropriate errors are surfaced if we encounter issues before we even attempt
// to start PG (e.g., if we can't pull the spec, can't sync safekeepers, or can't get the basebackup).
error!(
"compute did not enter Running state in {} seconds, exiting",
pg_init_timeout.as_secs()
);
std::process::exit(1);
}
state = compute
.state_changed
.wait_timeout(state, Duration::from_secs(5))
.unwrap()
.0;
}
}

View File

@@ -11,7 +11,9 @@ use std::time::{Duration, Instant};
use anyhow::{Result, bail};
use compute_api::responses::TlsConfig;
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
use compute_api::spec::{
Database, DatabricksSettings, GenericOption, GenericOptions, PgIdent, Role,
};
use futures::StreamExt;
use indexmap::IndexMap;
use ini::Ini;
@@ -184,6 +186,42 @@ impl DatabaseExt for Database {
}
}
pub trait DatabricksSettingsExt {
fn as_pg_settings(&self) -> String;
}
impl DatabricksSettingsExt for DatabricksSettings {
fn as_pg_settings(&self) -> String {
// Postgres GUCs rendered from DatabricksSettings
vec![
// ssl_ca_file
Some(format!(
"ssl_ca_file = '{}'",
self.pg_compute_tls_settings.ca_file
)),
// [Optional] databricks.workspace_url
Some(format!(
"databricks.workspace_url = '{}'",
&self.databricks_workspace_host
)),
// todo(vikas.jain): these are not required anymore as they are moved to static
// conf but keeping these to avoid image mismatch between hcc and pg.
// Once hcc and pg are in sync, we can remove these.
//
// databricks.enable_databricks_identity_login
Some("databricks.enable_databricks_identity_login = true".to_string()),
// databricks.enable_sql_restrictions
Some("databricks.enable_sql_restrictions = true".to_string()),
]
.into_iter()
// Removes `None`s
.flatten()
.collect::<Vec<String>>()
.join("\n")
+ "\n"
}
}
/// Generic trait used to provide quoting / encoding for strings used in the
/// Postgres SQL queries and DATABASE_URL.
pub trait Escaping {

View File

@@ -1,4 +1,6 @@
use std::fs::File;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use anyhow::{Result, anyhow, bail};
@@ -133,10 +135,25 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result
}
/// Check `pg_hba.conf` and update if needed to allow external connections.
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) -> Result<()> {
// XXX: consider making it a part of config.json
let pghba_path = pgdata_path.join("pg_hba.conf");
// Update pg_hba to contains databricks specfic settings before adding neon settings
// PG uses the first record that matches to perform authentication, so we need to have
// our rules before the default ones from neon.
// See https://www.postgresql.org/docs/16/auth-pg-hba-conf.html
if let Some(databricks_pg_hba) = databricks_pg_hba {
if config::line_in_file(
&pghba_path,
&format!("include_if_exists {}\n", *databricks_pg_hba),
)? {
info!("updated pg_hba.conf to include databricks_pg_hba.conf");
} else {
info!("pg_hba.conf already included databricks_pg_hba.conf");
}
}
if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
info!("updated pg_hba.conf to allow external connections");
} else {
@@ -146,6 +163,59 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
Ok(())
}
/// Check `pg_ident.conf` and update if needed to allow databricks config.
pub fn update_pg_ident(pgdata_path: &Path, databricks_pg_ident: Option<&String>) -> Result<()> {
info!("checking pg_ident.conf");
let pghba_path = pgdata_path.join("pg_ident.conf");
// Update pg_ident to contains databricks specfic settings
if let Some(databricks_pg_ident) = databricks_pg_ident {
if config::line_in_file(
&pghba_path,
&format!("include_if_exists {}\n", *databricks_pg_ident),
)? {
info!("updated pg_ident.conf to include databricks_pg_ident.conf");
} else {
info!("pg_ident.conf already included databricks_pg_ident.conf");
}
}
Ok(())
}
/// Copy tls key_file and cert_file from k8s secret mount directory
/// to pgdata and set private key file permissions as expected by Postgres.
/// See this doc for expected permission <https://www.postgresql.org/docs/current/ssl-tcp.html>
/// K8s secrets mount on dblet does not honor permission and ownership
/// specified in the Volume or VolumeMount. So we need to explicitly copy the file and set the permissions.
pub fn copy_tls_certificates(
key_file: &String,
cert_file: &String,
pgdata_path: &Path,
) -> Result<()> {
let files = [cert_file, key_file];
for file in files.iter() {
let source = Path::new(file);
let dest = pgdata_path.join(source.file_name().unwrap());
if !dest.exists() {
std::fs::copy(source, &dest)?;
info!(
"Copying tls file: {} to {}",
&source.display(),
&dest.display()
);
}
if *file == key_file {
// Postgres requires private key to be readable only by the owner by having
// chmod 600 permissions.
let permissions = Permissions::from_mode(0o600);
fs::set_permissions(&dest, permissions)?;
info!("Setting permission on {}.", &dest.display());
}
}
Ok(())
}
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of config.json
@@ -170,7 +240,11 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
}
#[instrument(skip_all)]
pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -> Result<()> {
pub async fn handle_migrations(
params: ComputeNodeParams,
client: &mut Client,
lakebase_mode: bool,
) -> Result<()> {
info!("handle migrations");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@@ -234,7 +308,7 @@ pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -
),
];
MigrationRunner::new(client, &migrations)
MigrationRunner::new(client, &migrations, lakebase_mode)
.run_migrations()
.await?;

View File

@@ -416,6 +416,32 @@ pub struct GenericOption {
pub vartype: String,
}
/// Postgres compute TLS settings.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct PgComputeTlsSettings {
// Absolute path to the certificate file for server-side TLS.
pub cert_file: String,
// Absolute path to the private key file for server-side TLS.
pub key_file: String,
// Absolute path to the certificate authority file for verifying client certificates.
pub ca_file: String,
}
/// Databricks specific options for compute instance.
/// This is used to store any other settings that needs to be propagate to Compute
/// but should not be persisted to ComputeSpec in the database.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct DatabricksSettings {
pub pg_compute_tls_settings: PgComputeTlsSettings,
// Absolute file path to databricks_pg_hba.conf file.
pub databricks_pg_hba: String,
// Absolute file path to databricks_pg_ident.conf file.
pub databricks_pg_ident: String,
// Hostname portion of the Databricks workspace URL of the endpoint, or empty string if not known.
// A valid hostname is required for the compute instance to support PAT logins.
pub databricks_workspace_host: String,
}
/// Optional collection of `GenericOption`'s. Type alias allows us to
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;

View File

@@ -394,7 +394,7 @@ impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
tracing_utils::ExportConfig {
endpoint: Some(val.endpoint.clone()),
protocol: val.protocol.into(),
timeout: val.timeout,
timeout: Some(val.timeout),
}
}
}

View File

@@ -301,7 +301,12 @@ pub struct PullTimelineRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub http_hosts: Vec<String>,
pub ignore_tombstone: Option<bool>,
/// Membership configuration to switch to after pull.
/// It guarantees that if pull_timeline returns successfully, the timeline will
/// not be deleted by request with an older generation.
/// Storage controller always sets this field.
/// None is only allowed for manual pull_timeline requests.
pub mconf: Option<Configuration>,
}
#[derive(Debug, Serialize, Deserialize)]

View File

@@ -1,11 +1,5 @@
//! Helper functions to set up OpenTelemetry tracing.
//!
//! This comes in two variants, depending on whether you have a Tokio runtime available.
//! If you do, call `init_tracing()`. It sets up the trace processor and exporter to use
//! the current tokio runtime. If you don't have a runtime available, or you don't want
//! to share the runtime with the tracing tasks, call `init_tracing_without_runtime()`
//! instead. It sets up a dedicated single-threaded Tokio runtime for the tracing tasks.
//!
//! Example:
//!
//! ```rust,no_run
@@ -21,7 +15,8 @@
//! .with_writer(std::io::stderr);
//!
//! // Initialize OpenTelemetry. Exports tracing spans as OpenTelemetry traces
//! let otlp_layer = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default()).await;
//! let provider = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default());
//! let otlp_layer = provider.as_ref().map(tracing_utils::layer);
//!
//! // Put it all together
//! tracing_subscriber::registry()
@@ -36,16 +31,18 @@
pub mod http;
pub mod perf_span;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
pub use opentelemetry_otlp::{ExportConfig, Protocol};
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing::level_filters::LevelFilter;
use tracing::{Dispatch, Subscriber};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::LookupSpan;
pub type Provider = SdkTracerProvider;
/// Set up OpenTelemetry exporter, using configuration from environment variables.
///
/// `service_name` is set as the OpenTelemetry 'service.name' resource (see
@@ -70,16 +67,7 @@ use tracing_subscriber::registry::LookupSpan;
/// If you need some other setting, please test if it works first. And perhaps
/// add a comment in the list above to save the effort of testing for the next
/// person.
///
/// This doesn't block, but is marked as 'async' to hint that this must be called in
/// asynchronous execution context.
pub async fn init_tracing<S>(
service_name: &str,
export_config: ExportConfig,
) -> Option<impl Layer<S>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
pub fn init_tracing(service_name: &str, export_config: ExportConfig) -> Option<Provider> {
if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) {
return None;
};
@@ -89,52 +77,14 @@ where
))
}
/// Like `init_tracing`, but creates a separate tokio Runtime for the tracing
/// tasks.
pub fn init_tracing_without_runtime<S>(
service_name: &str,
export_config: ExportConfig,
) -> Option<impl Layer<S>>
pub fn layer<S>(p: &Provider) -> impl Layer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) {
return None;
};
// The opentelemetry batch processor and the OTLP exporter needs a Tokio
// runtime. Create a dedicated runtime for them. One thread should be
// enough.
//
// (Alternatively, instead of batching, we could use the "simple
// processor", which doesn't need Tokio, and use "reqwest-blocking"
// feature for the OTLP exporter, which also doesn't need Tokio. However,
// batching is considered best practice, and also I have the feeling that
// the non-Tokio codepaths in the opentelemetry crate are less used and
// might be more buggy, so better to stay on the well-beaten path.)
//
// We leak the runtime so that it keeps running after we exit the
// function.
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("otlp runtime thread")
.worker_threads(1)
.build()
.unwrap(),
));
let _guard = runtime.enter();
Some(init_tracing_internal(
service_name.to_string(),
export_config,
))
tracing_opentelemetry::layer().with_tracer(p.tracer("global"))
}
fn init_tracing_internal<S>(service_name: String, export_config: ExportConfig) -> impl Layer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn init_tracing_internal(service_name: String, export_config: ExportConfig) -> Provider {
// Sets up exporter from the provided [`ExportConfig`] parameter.
// If the endpoint is not specified, it is loaded from the
// OTEL_EXPORTER_OTLP_ENDPOINT environment variable.
@@ -153,22 +103,14 @@ where
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
);
let tracer = opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.with_resource(opentelemetry_sdk::Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name,
)]))
Provider::builder()
.with_batch_exporter(exporter)
.with_resource(
opentelemetry_sdk::Resource::builder()
.with_service_name(service_name)
.build(),
)
.build()
.tracer("global");
tracing_opentelemetry::layer().with_tracer(tracer)
}
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
pub fn shutdown_tracing() {
opentelemetry::global::shutdown_tracer_provider();
}
pub enum OtelEnablement {
@@ -176,17 +118,17 @@ pub enum OtelEnablement {
Enabled {
service_name: String,
export_config: ExportConfig,
runtime: &'static tokio::runtime::Runtime,
},
}
pub struct OtelGuard {
provider: Provider,
pub dispatch: Dispatch,
}
impl Drop for OtelGuard {
fn drop(&mut self) {
shutdown_tracing();
_ = self.provider.shutdown();
}
}
@@ -199,22 +141,19 @@ impl Drop for OtelGuard {
/// The lifetime of the guard should match taht of the application. On drop, it tears down the
/// OTEL infra.
pub fn init_performance_tracing(otel_enablement: OtelEnablement) -> Option<OtelGuard> {
let otel_subscriber = match otel_enablement {
match otel_enablement {
OtelEnablement::Disabled => None,
OtelEnablement::Enabled {
service_name,
export_config,
runtime,
} => {
let otel_layer = runtime
.block_on(init_tracing(&service_name, export_config))
.with_filter(LevelFilter::INFO);
let provider = init_tracing(&service_name, export_config)?;
let otel_layer = layer(&provider).with_filter(LevelFilter::INFO);
let otel_subscriber = tracing_subscriber::registry().with(otel_layer);
let otel_dispatch = Dispatch::new(otel_subscriber);
let dispatch = Dispatch::new(otel_subscriber);
Some(otel_dispatch)
Some(OtelGuard { dispatch, provider })
}
};
otel_subscriber.map(|dispatch| OtelGuard { dispatch })
}
}

View File

@@ -11,6 +11,7 @@
//! from data stored in object storage.
//!
use std::fmt::Write as FmtWrite;
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use anyhow::{Context, anyhow};
@@ -420,12 +421,16 @@ where
}
let mut min_restart_lsn: Lsn = Lsn::MAX;
let mut dbdir_cnt = 0;
let mut rel_cnt = 0;
// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
{
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
dbdir_cnt += 1;
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
let rels = self
@@ -433,6 +438,7 @@ where
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?;
for &rel in rels.iter() {
rel_cnt += 1;
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
@@ -455,6 +461,10 @@ where
}
}
self.timeline
.db_rel_count
.store(Some(Arc::new((dbdir_cnt, rel_cnt))));
let start_time = Instant::now();
let aux_files = self
.timeline

View File

@@ -126,7 +126,6 @@ fn main() -> anyhow::Result<()> {
Some(cfg) => tracing_utils::OtelEnablement::Enabled {
service_name: "pageserver".to_string(),
export_config: (&cfg.export_config).into(),
runtime: *COMPUTE_REQUEST_RUNTIME,
},
None => tracing_utils::OtelEnablement::Disabled,
};

View File

@@ -156,6 +156,8 @@ impl FeatureResolver {
let tenant_properties = PerTenantProperties {
remote_size_mb: Some(rand::rng().random_range(100.0..1000000.00)),
db_count_max: Some(rand::rng().random_range(1..1000)),
rel_count_max: Some(rand::rng().random_range(1..1000)),
}
.into_posthog_properties();
@@ -344,6 +346,8 @@ impl FeatureResolver {
struct PerTenantProperties {
pub remote_size_mb: Option<f64>,
pub db_count_max: Option<usize>,
pub rel_count_max: Option<usize>,
}
impl PerTenantProperties {
@@ -355,6 +359,18 @@ impl PerTenantProperties {
PostHogFlagFilterPropertyValue::Number(remote_size_mb),
);
}
if let Some(db_count) = self.db_count_max {
properties.insert(
"tenant_db_count_max".to_string(),
PostHogFlagFilterPropertyValue::Number(db_count as f64),
);
}
if let Some(rel_count) = self.rel_count_max {
properties.insert(
"tenant_rel_count_max".to_string(),
PostHogFlagFilterPropertyValue::Number(rel_count as f64),
);
}
properties
}
}
@@ -409,7 +425,11 @@ impl TenantFeatureResolver {
/// Refresh the cached properties and flags on the critical path.
pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
// Any of the remote size is none => this property is none.
let mut remote_size_mb = Some(0.0);
// Any of the db or rel count is available => this property is available.
let mut db_count_max = None;
let mut rel_count_max = None;
for timeline in tenant_shard.list_timelines() {
let size = timeline.metrics.resident_physical_size_get();
if size == 0 {
@@ -419,9 +439,25 @@ impl TenantFeatureResolver {
if let Some(ref mut remote_size_mb) = remote_size_mb {
*remote_size_mb += size as f64 / 1024.0 / 1024.0;
}
if let Some(data) = timeline.db_rel_count.load_full() {
let (db_count, rel_count) = *data.as_ref();
if db_count_max.is_none() {
db_count_max = Some(db_count);
}
if rel_count_max.is_none() {
rel_count_max = Some(rel_count);
}
db_count_max = db_count_max.map(|max| max.max(db_count));
rel_count_max = rel_count_max.map(|max| max.max(rel_count));
}
}
self.cached_tenant_properties.store(Arc::new(
PerTenantProperties { remote_size_mb }.into_posthog_properties(),
PerTenantProperties {
remote_size_mb,
db_count_max,
rel_count_max,
}
.into_posthog_properties(),
));
// BEGIN: Update the feature flag on the critical path.

View File

@@ -8,6 +8,7 @@
//!
use std::collections::{HashMap, HashSet, hash_map};
use std::ops::{ControlFlow, Range};
use std::sync::Arc;
use crate::walingest::{WalIngestError, WalIngestErrorKind};
use crate::{PERF_TRACE_TARGET, ensure_walingest};
@@ -1254,11 +1255,16 @@ impl Timeline {
let dbdir = DbDirectory::des(&buf)?;
let mut total_size: u64 = 0;
let mut dbdir_cnt = 0;
let mut rel_cnt = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
dbdir_cnt += 1;
for rel in self
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
.await?
{
rel_cnt += 1;
if self.cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}
@@ -1269,6 +1275,10 @@ impl Timeline {
total_size += relsize as u64;
}
}
self.db_rel_count
.store(Some(Arc::new((dbdir_cnt, rel_cnt))));
Ok(total_size * BLCKSZ as u64)
}

View File

@@ -287,7 +287,7 @@ pub struct Timeline {
ancestor_lsn: Lsn,
// The LSN of gc-compaction that was last applied to this timeline.
gc_compaction_state: ArcSwap<Option<GcCompactionState>>,
gc_compaction_state: ArcSwapOption<GcCompactionState>,
pub(crate) metrics: Arc<TimelineMetrics>,
@@ -448,7 +448,11 @@ pub struct Timeline {
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_cache: Arc<BasebackupCache>,
#[expect(dead_code)]
feature_resolver: Arc<TenantFeatureResolver>,
/// Basebackup will collect the count and store it here. Used for reldirv2 rollout.
pub(crate) db_rel_count: ArcSwapOption<(usize, usize)>,
}
pub(crate) enum PreviousHeatmap {
@@ -3236,7 +3240,7 @@ impl Timeline {
}),
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
gc_compaction_state: ArcSwap::new(Arc::new(gc_compaction_state)),
gc_compaction_state: ArcSwapOption::from_pointee(gc_compaction_state),
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_ts: RwLock::new(Instant::now()),
@@ -3341,6 +3345,8 @@ impl Timeline {
basebackup_cache: resources.basebackup_cache,
feature_resolver: resources.feature_resolver.clone(),
db_rel_count: ArcSwapOption::from_pointee(None),
};
result.repartition_threshold =
@@ -3412,7 +3418,7 @@ impl Timeline {
gc_compaction_state: GcCompactionState,
) -> anyhow::Result<()> {
self.gc_compaction_state
.store(Arc::new(Some(gc_compaction_state.clone())));
.store(Some(Arc::new(gc_compaction_state.clone())));
self.remote_client
.schedule_index_upload_for_gc_compaction_state_update(gc_compaction_state)
}
@@ -3428,7 +3434,10 @@ impl Timeline {
}
pub(crate) fn get_gc_compaction_state(&self) -> Option<GcCompactionState> {
self.gc_compaction_state.load_full().as_ref().clone()
self.gc_compaction_state
.load()
.as_ref()
.map(|x| x.as_ref().clone())
}
/// Creates and starts the wal receiver.

View File

@@ -1326,13 +1326,7 @@ impl Timeline {
.max()
};
let (partition_mode, partition_lsn) = if cfg!(test)
|| cfg!(feature = "testing")
|| self
.feature_resolver
.evaluate_boolean("image-compaction-boundary")
.is_ok()
{
let (partition_mode, partition_lsn) = {
let last_repartition_lsn = self.partitioning.read().1;
let lsn = match l0_l1_boundary_lsn {
Some(boundary) => gc_cutoff
@@ -1348,8 +1342,6 @@ impl Timeline {
} else {
("l0_l1_boundary", lsn)
}
} else {
("latest_record", self.get_last_record_lsn())
};
// 2. Repartition and create image layers if necessary

View File

@@ -362,7 +362,7 @@ impl<T: Types> Cache<T> {
tokio::time::sleep(RETRY_BACKOFF).await;
continue;
} else {
tracing::warn!(
tracing::info!(
"Failed to resolve tenant shard after {} attempts: {:?}",
GET_MAX_RETRIES,
e

View File

@@ -178,6 +178,8 @@ static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void pageserver_disconnect_shard(shardno_t shard_no);
// HADRON
shardno_t get_num_shards(void);
static bool
PagestoreShmemIsValid(void)
@@ -286,6 +288,22 @@ AssignPageserverConnstring(const char *newval, void *extra)
}
}
/* BEGIN_HADRON */
/**
* Return the total number of shards seen in the shard map.
*/
shardno_t get_num_shards(void)
{
const ShardMap *shard_map;
Assert(pagestore_shared);
shard_map = &pagestore_shared->shard_map;
Assert(shard_map != NULL);
return shard_map->num_shards;
}
/* END_HADRON */
/*
* Get the current number of shards, and/or the connection string for a
* particular shard from the shard map in shared memory.

View File

@@ -72,22 +72,21 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
(tag).rnode = (rinfo); \
} while (false)
#define BufTagGetNRelFileInfo(tag) tag.rnode
#define BufTagGetNRelFileInfo(tag) (tag).rnode
#define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode)
#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \
#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \
do { \
RelFileNode rnode = { .spcNode = spcOid, .dbNode = dbOid, .relNode = relNumber}; \
(tag).forkNum = forknum; \
(tag).blockNum = blkno; \
(tag).rnode = rnode; \
RelFileNode rnode = { .spcNode = (spc_oid), .dbNode = (db_oid), .relNode = (rel_number)}; \
(tag).forkNum = (fork_number); \
(tag).blockNum = (block_number); \
(tag).rnode = rnode; \
} while (false)
#define InvalidRelFileNumber InvalidOid
#define SMgrRelGetRelInfo(reln) \
(reln->smgr_rnode.node)
#define SMgrRelGetRelInfo(reln) ((reln)->smgr_rnode.node)
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
@@ -133,17 +132,16 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
.relNumber = (tag).relNumber, \
})
#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \
#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \
do { \
(tag).forkNum = forknum; \
(tag).blockNum = blkno; \
(tag).spcOid = spcOid; \
(tag).dbOid = dbOid; \
(tag).relNumber = relNumber; \
(tag).forkNum = (fork_number); \
(tag).blockNum = (block_number); \
(tag).spcOid = (spc_oid); \
(tag).dbOid = (db_oid); \
(tag).relNumber = (rel_number); \
} while (false)
#define SMgrRelGetRelInfo(reln) \
((reln)->smgr_rlocator)
#define SMgrRelGetRelInfo(reln) ((reln)->smgr_rlocator)
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
#endif

View File

@@ -110,6 +110,9 @@ static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static void CheckGracefulShutdown(WalProposer *wp);
// HADRON
shardno_t get_num_shards(void);
static void
init_walprop_config(bool syncSafekeepers)
{
@@ -646,18 +649,19 @@ walprop_pg_get_shmem_state(WalProposer *wp)
* Record new ps_feedback in the array with shards and update min_feedback.
*/
static PageserverFeedback
record_pageserver_feedback(PageserverFeedback *ps_feedback)
record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards)
{
PageserverFeedback min_feedback;
Assert(ps_feedback->present);
Assert(ps_feedback->shard_number < MAX_SHARDS);
Assert(ps_feedback->shard_number < num_shards);
SpinLockAcquire(&walprop_shared->mutex);
/* Update the number of shards */
if (ps_feedback->shard_number + 1 > walprop_shared->num_shards)
walprop_shared->num_shards = ps_feedback->shard_number + 1;
// Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive
// a new pageserver feedback.
walprop_shared->num_shards = Max(walprop_shared->num_shards, num_shards);
/* Update the feedback */
memcpy(&walprop_shared->shard_ps_feedback[ps_feedback->shard_number], ps_feedback, sizeof(PageserverFeedback));
@@ -2023,19 +2027,43 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
if (wp->config->syncSafekeepers)
return;
/* handle fresh ps_feedback */
if (sk->appendResponse.ps_feedback.present)
{
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback);
shardno_t num_shards = get_num_shards();
/* Only one main shard sends non-zero currentClusterSize */
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
// During shard split, we receive ps_feedback from child shards before
// the split commits and our shard map GUC has been updated. We must
// filter out such feedback here because record_pageserver_feedback()
// doesn't do it.
//
// NB: what we would actually want to happen is that we only receive
// ps_feedback from the parent shards when the split is committed, then
// apply the split to our set of tracked feedback and from here on only
// receive ps_feedback from child shards. This filter condition doesn't
// do that: if we split from N parent to 2N child shards, the first N
// child shards' feedback messages will pass this condition, even before
// the split is committed. That's a bit sloppy, but OK for now.
if (sk->appendResponse.ps_feedback.shard_number < num_shards)
{
standby_apply_lsn = min_feedback.disk_consistent_lsn;
needToAdvanceSlot = true;
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback, num_shards);
/* Only one main shard sends non-zero currentClusterSize */
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
{
standby_apply_lsn = min_feedback.disk_consistent_lsn;
needToAdvanceSlot = true;
}
}
else
{
// HADRON
elog(DEBUG2, "Ignoring pageserver feedback for unknown shard %d (current shard number %d)",
sk->appendResponse.ps_feedback.shard_number, num_shards);
}
}

View File

@@ -76,7 +76,7 @@ fn cli() -> clap::Command {
}
pub async fn run() -> anyhow::Result<()> {
let _logging_guard = crate::logging::init().await?;
let _logging_guard = crate::logging::init()?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);

View File

@@ -334,7 +334,7 @@ struct PgSniRouterArgs {
}
pub async fn run() -> anyhow::Result<()> {
let _logging_guard = crate::logging::init().await?;
let _logging_guard = crate::logging::init()?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);

View File

@@ -26,7 +26,7 @@ use crate::metrics::Metrics;
/// configuration from environment variables. For example, to change the
/// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
/// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
pub async fn init() -> anyhow::Result<LoggingGuard> {
pub fn init() -> anyhow::Result<LoggingGuard> {
let logfmt = LogFormat::from_env()?;
let env_filter = EnvFilter::builder()
@@ -43,8 +43,8 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.expect("this should be a valid filter directive"),
);
let otlp_layer =
tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
let provider = tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default());
let otlp_layer = provider.as_ref().map(tracing_utils::layer);
let json_log_layer = if logfmt == LogFormat::Json {
Some(JsonLoggingLayer::new(
@@ -76,7 +76,7 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.with(text_log_layer)
.try_init()?;
Ok(LoggingGuard)
Ok(LoggingGuard(provider))
}
/// Initialize logging for local_proxy with log prefix and no opentelemetry.
@@ -97,7 +97,7 @@ pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
.with(fmt_layer)
.try_init()?;
Ok(LoggingGuard)
Ok(LoggingGuard(None))
}
pub struct LocalProxyFormatter(Format<Full, SystemTime>);
@@ -118,14 +118,16 @@ where
}
}
pub struct LoggingGuard;
pub struct LoggingGuard(Option<tracing_utils::Provider>);
impl Drop for LoggingGuard {
fn drop(&mut self) {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
tracing::info!("shutting down the tracing machinery");
tracing_utils::shutdown_tracing();
if let Some(p) = &self.0 {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
tracing::info!("shutting down the tracing machinery");
drop(p.shutdown());
}
}
}

View File

@@ -161,9 +161,9 @@ pub async fn handle_request(
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path, None).await?;
global_timelines
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
.load_temp_timeline(request.destination_ttid, &tli_dir_path, None)
.await?;
Ok(())

View File

@@ -193,7 +193,7 @@ pub async fn hcc_pull_timeline(
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
http_hosts: Vec::new(),
ignore_tombstone: None,
mconf: None,
};
for host in timeline.peers {
if host.0 == conf.my_id.0 {

View File

@@ -352,7 +352,7 @@ async fn timeline_exclude_handler(mut request: Request<Body>) -> Result<Response
// instead.
if data.mconf.contains(my_id) {
return Err(ApiError::Forbidden(format!(
"refused to switch into {}, node {} is member of it",
"refused to exclude timeline with {}, node {} is member of it",
data.mconf, my_id
)));
}

View File

@@ -13,8 +13,8 @@ use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::GenericRemoteStorage;
use reqwest::Certificate;
use safekeeper_api::Term;
use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
use safekeeper_api::{Term, membership};
use safekeeper_client::mgmt_api;
use safekeeper_client::mgmt_api::Client;
use serde::Deserialize;
@@ -453,12 +453,40 @@ pub async fn handle_request(
global_timelines: Arc<GlobalTimelines>,
wait_for_peer_timeline_status: bool,
) -> Result<PullTimelineResponse, ApiError> {
if let Some(mconf) = &request.mconf {
let sk_id = global_timelines.get_sk_id();
if !mconf.contains(sk_id) {
return Err(ApiError::BadRequest(anyhow!(
"refused to pull timeline with {mconf}, node {sk_id} is not member of it",
)));
}
}
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
));
if existing_tli.is_ok() {
info!("Timeline {} already exists", request.timeline_id);
if let Ok(timeline) = existing_tli {
let cur_generation = timeline
.read_shared_state()
.await
.sk
.state()
.mconf
.generation;
info!(
"Timeline {} already exists with generation {cur_generation}",
request.timeline_id,
);
if let Some(mconf) = request.mconf {
timeline
.membership_switch(mconf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
}
return Ok(PullTimelineResponse {
safekeeper_host: None,
});
@@ -495,6 +523,19 @@ pub async fn handle_request(
for (i, response) in responses.into_iter().enumerate() {
match response {
Ok(status) => {
if let Some(mconf) = &request.mconf {
if status.mconf.generation > mconf.generation {
// We probably raced with another timeline membership change with higher generation.
// Ignore this request.
return Err(ApiError::Conflict(format!(
"cannot pull timeline with generation {}: timeline {} already exists with generation {} on {}",
mconf.generation,
request.timeline_id,
status.mconf.generation,
http_hosts[i],
)));
}
}
statuses.push((status, i));
}
Err(e) => {
@@ -593,15 +634,13 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
match pull_timeline(
status,
safekeeper_host,
sk_auth_token,
http_client,
global_timelines,
check_tombstone,
request.mconf,
)
.await
{
@@ -611,6 +650,10 @@ pub async fn handle_request(
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
safekeeper_host: None,
}),
Some(TimelineError::Deleted(_)) => Err(ApiError::Conflict(format!(
"Timeline {}/{} deleted",
request.tenant_id, request.timeline_id
))),
Some(TimelineError::CreationInProgress(_)) => {
// We don't return success here because creation might still fail.
Err(ApiError::Conflict("Creation in progress".to_owned()))
@@ -627,7 +670,7 @@ async fn pull_timeline(
sk_auth_token: Option<SecretString>,
http_client: reqwest::Client,
global_timelines: Arc<GlobalTimelines>,
check_tombstone: bool,
mconf: Option<membership::Configuration>,
) -> Result<PullTimelineResponse> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!(
@@ -689,8 +732,11 @@ async fn pull_timeline(
// fsync temp timeline directory to remember its contents.
fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
let generation = mconf.as_ref().map(|c| c.generation);
// Let's create timeline from temp directory and verify that it's correct
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
let (commit_lsn, flush_lsn) =
validate_temp_timeline(conf, ttid, &tli_dir_path, generation).await?;
info!(
"finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
ttid, commit_lsn, flush_lsn
@@ -698,10 +744,20 @@ async fn pull_timeline(
assert!(status.commit_lsn <= status.flush_lsn);
// Finally, load the timeline.
let _tli = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
let timeline = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, generation)
.await?;
if let Some(mconf) = mconf {
// Switch to provided mconf to guarantee that the timeline will not
// be deleted by request with older generation.
// The generation might already be higer than the one in mconf, e.g.
// if another membership_switch request was executed between `load_temp_timeline`
// and `membership_switch`, but that's totaly fine. `membership_switch` will
// ignore switch to older generation.
timeline.membership_switch(mconf).await?;
}
Ok(PullTimelineResponse {
safekeeper_host: Some(host),
})

View File

@@ -1026,6 +1026,13 @@ where
self.state.finish_change(&state).await?;
}
if msg.mconf.generation > self.state.mconf.generation && !msg.mconf.contains(self.node_id) {
bail!(
"refused to switch into {}, node {} is not a member of it",
msg.mconf,
self.node_id,
);
}
// Switch into conf given by proposer conf if it is higher.
self.state.membership_switch(msg.mconf.clone()).await?;

View File

@@ -427,6 +427,9 @@ impl From<TimelineError> for ApiError {
TimelineError::NotFound(ttid) => {
ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
}
TimelineError::Deleted(ttid) => {
ApiError::NotFound(anyhow!("timeline {} deleted", ttid).into())
}
_ => ApiError::InternalServerError(anyhow!("{}", te)),
}
}
@@ -591,7 +594,7 @@ impl Timeline {
/// Cancel the timeline, requesting background activity to stop. Closing
/// the `self.gate` waits for that.
pub async fn cancel(&self) {
pub fn cancel(&self) {
info!("timeline {} shutting down", self.ttid);
self.cancel.cancel();
}
@@ -911,6 +914,13 @@ impl Timeline {
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let mut state = self.write_shared_state().await;
// Ensure we don't race with exclude/delete requests by checking the cancellation
// token under the write_shared_state lock.
// Exclude/delete cancel the timeline under the shared state lock,
// so the timeline cannot be deleted in the middle of the membership switch.
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
state.sk.membership_switch(to).await
}

View File

@@ -10,13 +10,13 @@ use std::time::{Duration, Instant};
use anyhow::{Context, Result, bail};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use safekeeper_api::membership::Configuration;
use safekeeper_api::membership::{Configuration, SafekeeperGeneration};
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
use safekeeper_api::{ServerInfo, membership};
use tokio::fs;
use tracing::*;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
@@ -40,10 +40,17 @@ enum GlobalMapTimeline {
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,
/// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
/// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
/// this map is dropped on restart.
/// The timeline might also be locally deleted (excluded) via safekeeper migration algorithm. In that case,
/// the tombsone contains the corresponding safekeeper generation. The pull_timeline requests with
/// higher generation ignore such tombstones and can recreate the timeline.
timeline_tombstones: HashMap<TenantTimelineId, TimelineTombstone>,
/// A tombstone indicates that the tenant used to exist has been deleted.
/// These are created only by tenant_delete requests. They are always valid regardless of the
/// request generation.
/// This is only soft-enforced, as this map is dropped on restart.
tenant_tombstones: HashMap<TenantId, Instant>,
conf: Arc<SafeKeeperConf>,
@@ -79,7 +86,7 @@ impl GlobalTimelinesState {
Err(TimelineError::CreationInProgress(*ttid))
}
None => {
if self.has_tombstone(ttid) {
if self.has_tombstone(ttid, None) {
Err(TimelineError::Deleted(*ttid))
} else {
Err(TimelineError::NotFound(*ttid))
@@ -88,20 +95,46 @@ impl GlobalTimelinesState {
}
}
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
fn has_timeline_tombstone(
&self,
ttid: &TenantTimelineId,
generation: Option<SafekeeperGeneration>,
) -> bool {
if let Some(generation) = generation {
self.timeline_tombstones
.get(ttid)
.is_some_and(|t| t.is_valid(generation))
} else {
self.timeline_tombstones.contains_key(ttid)
}
}
/// Removes all blocking tombstones for the given timeline ID.
fn has_tenant_tombstone(&self, tenant_id: &TenantId) -> bool {
self.tenant_tombstones.contains_key(tenant_id)
}
/// Check if the state has a tenant or a timeline tombstone.
/// If `generation` is provided, check only for timeline tombsotnes with same or higher generation.
/// If `generation` is `None`, check for any timeline tombstone.
/// Tenant tombstones are checked regardless of the generation.
fn has_tombstone(
&self,
ttid: &TenantTimelineId,
generation: Option<SafekeeperGeneration>,
) -> bool {
self.has_timeline_tombstone(ttid, generation) || self.has_tenant_tombstone(&ttid.tenant_id)
}
/// Removes timeline tombstone for the given timeline ID.
/// Returns `true` if there have been actual changes.
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
self.tombstones.remove(ttid).is_some()
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
fn remove_timeline_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
self.timeline_tombstones.remove(ttid).is_some()
}
fn delete(&mut self, ttid: TenantTimelineId) {
fn delete(&mut self, ttid: TenantTimelineId, generation: Option<SafekeeperGeneration>) {
self.timelines.remove(&ttid);
self.tombstones.insert(ttid, Instant::now());
self.timeline_tombstones
.insert(ttid, TimelineTombstone::new(generation));
}
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
@@ -120,7 +153,7 @@ impl GlobalTimelines {
Self {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
timeline_tombstones: HashMap::new(),
tenant_tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
@@ -261,6 +294,8 @@ impl GlobalTimelines {
start_lsn: Lsn,
commit_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let generation = Some(mconf.generation);
let (conf, _, _, _) = {
let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
@@ -268,8 +303,8 @@ impl GlobalTimelines {
return Ok(timeline);
}
if state.has_tombstone(&ttid) {
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
if state.has_tombstone(&ttid, generation) {
anyhow::bail!(TimelineError::Deleted(ttid));
}
state.get_dependencies()
@@ -284,7 +319,9 @@ impl GlobalTimelines {
// immediately initialize first WAL segment as well.
let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
let timeline = self
.load_temp_timeline(ttid, &tmp_dir_path, generation)
.await?;
Ok(timeline)
}
@@ -303,7 +340,7 @@ impl GlobalTimelines {
&self,
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
check_tombstone: bool,
generation: Option<SafekeeperGeneration>,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
@@ -317,18 +354,18 @@ impl GlobalTimelines {
}
_ => {}
}
if check_tombstone {
if state.has_tombstone(&ttid) {
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
}
} else {
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
if state.remove_tombstone(&ttid) {
warn!("un-deleted timeline {ttid}");
}
if state.has_tombstone(&ttid, generation) {
// If the timeline is deleted, we refuse to recreate it.
// This is a safeguard against accidentally overwriting a timeline that was deleted
// by concurrent request.
anyhow::bail!(TimelineError::Deleted(ttid));
}
// We might have an outdated tombstone with the older generation.
// Remove it unconditionally.
state.remove_timeline_tombstone(&ttid);
state
.timelines
.insert(ttid, GlobalMapTimeline::CreationInProgress);
@@ -503,11 +540,16 @@ impl GlobalTimelines {
ttid: &TenantTimelineId,
action: DeleteOrExclude,
) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
let generation = match &action {
DeleteOrExclude::Delete | DeleteOrExclude::DeleteLocal => None,
DeleteOrExclude::Exclude(mconf) => Some(mconf.generation),
};
let tli_res = {
let state = self.state.lock().unwrap();
// Do NOT check tenant tombstones here: those were set earlier
if state.tombstones.contains_key(ttid) {
if state.has_timeline_tombstone(ttid, generation) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
return Ok(TimelineDeleteResult { dir_existed: false });
@@ -528,6 +570,11 @@ impl GlobalTimelines {
// We would like to avoid holding the lock while waiting for the
// gate to finish as this is deadlock prone, so for actual
// deletion will take it second time.
//
// Canceling the timeline will block membership switch requests,
// ensuring that the timeline generation will not increase
// after this point, and we will not remove a timeline with a generation
// higher than the requested one.
if let DeleteOrExclude::Exclude(ref mconf) = action {
let shared_state = timeline.read_shared_state().await;
if shared_state.sk.state().mconf.generation > mconf.generation {
@@ -536,9 +583,9 @@ impl GlobalTimelines {
current: shared_state.sk.state().mconf.clone(),
});
}
timeline.cancel().await;
timeline.cancel();
} else {
timeline.cancel().await;
timeline.cancel();
}
timeline.close().await;
@@ -565,7 +612,7 @@ impl GlobalTimelines {
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
// are used to prevent still-running computes from re-creating the same timeline when they send data,
// and to speed up repeated deletion calls by avoiding re-listing objects.
self.state.lock().unwrap().delete(*ttid);
self.state.lock().unwrap().delete(*ttid, generation);
result
}
@@ -627,12 +674,16 @@ impl GlobalTimelines {
// may recreate a deleted timeline.
let now = Instant::now();
state
.tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
.timeline_tombstones
.retain(|_, v| now.duration_since(v.timestamp) < *tombstone_ttl);
state
.tenant_tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
}
pub fn get_sk_id(&self) -> NodeId {
self.state.lock().unwrap().conf.my_id
}
}
/// Action for delete_or_exclude.
@@ -673,6 +724,7 @@ pub async fn validate_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
path: &Utf8PathBuf,
generation: Option<SafekeeperGeneration>,
) -> Result<(Lsn, Lsn)> {
let control_path = path.join("safekeeper.control");
@@ -681,6 +733,15 @@ pub async fn validate_temp_timeline(
bail!("wal_seg_size is not set");
}
if let Some(generation) = generation {
if control_store.mconf.generation > generation {
bail!(
"tmp timeline generation {} is higher than expected {generation}",
control_store.mconf.generation
);
}
}
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
let commit_lsn = control_store.commit_lsn;
@@ -688,3 +749,28 @@ pub async fn validate_temp_timeline(
Ok((commit_lsn, flush_lsn))
}
/// A tombstone for a deleted timeline.
/// The generation is passed with "exclude" request and stored in the tombstone.
/// We ignore the tombstone if the request generation is higher than
/// the tombstone generation.
/// If the tombstone doesn't have a generation, it's considered permanent,
/// e.g. after "delete" request.
struct TimelineTombstone {
timestamp: Instant,
generation: Option<SafekeeperGeneration>,
}
impl TimelineTombstone {
fn new(generation: Option<SafekeeperGeneration>) -> Self {
TimelineTombstone {
timestamp: Instant::now(),
generation,
}
}
/// Check if the timeline is still valid for the given generation.
fn is_valid(&self, generation: SafekeeperGeneration) -> bool {
self.generation.is_none_or(|g| g >= generation)
}
}

View File

@@ -0,0 +1 @@
ALTER TABLE timelines DROP sk_set_notified_generation;

View File

@@ -0,0 +1 @@
ALTER TABLE timelines ADD sk_set_notified_generation INTEGER NOT NULL DEFAULT 1;

View File

@@ -351,7 +351,7 @@ impl Node {
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
cancel_new_retries: &CancellationToken,
) -> Option<mgmt_api::Result<T>>
where
O: FnMut(PageserverClient) -> F,
@@ -402,7 +402,7 @@ impl Node {
self.id,
self.base_url(),
),
cancel,
cancel_new_retries,
)
.await
}

View File

@@ -131,6 +131,8 @@ pub(crate) enum DatabaseOperation {
InsertTimeline,
UpdateTimeline,
UpdateTimelineMembership,
UpdateCplaneNotifiedGeneration,
UpdateSkSetNotifiedGeneration,
GetTimeline,
InsertTimelineReconcile,
RemoveTimelineReconcile,
@@ -1497,6 +1499,8 @@ impl Persistence {
/// Update timeline membership configuration in the database.
/// Perform a compare-and-swap (CAS) operation on the timeline's generation.
/// The `new_generation` must be the next (+1) generation after the one in the database.
/// Also inserts reconcile_requests to safekeeper_timeline_pending_ops table in the same
/// transaction.
pub(crate) async fn update_timeline_membership(
&self,
tenant_id: TenantId,
@@ -1504,8 +1508,11 @@ impl Persistence {
new_generation: SafekeeperGeneration,
sk_set: &[NodeId],
new_sk_set: Option<&[NodeId]>,
reconcile_requests: &[TimelinePendingOpPersistence],
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
use crate::schema::safekeeper_timeline_pending_ops as stpo;
use crate::schema::timelines;
use diesel::query_dsl::methods::FilterDsl;
let prev_generation = new_generation.previous().unwrap();
@@ -1513,14 +1520,15 @@ impl Persistence {
let timeline_id = &timeline_id;
self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| {
Box::pin(async move {
let updated = diesel::update(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::generation.eq(prev_generation.into_inner() as i32))
let updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(&tenant_id.to_string()))
.filter(timelines::timeline_id.eq(&timeline_id.to_string()))
.filter(timelines::generation.eq(prev_generation.into_inner() as i32))
.set((
dsl::generation.eq(new_generation.into_inner() as i32),
dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
dsl::new_sk_set.eq(new_sk_set
timelines::generation.eq(new_generation.into_inner() as i32),
timelines::sk_set
.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
timelines::new_sk_set.eq(new_sk_set
.map(|set| set.iter().map(|id| id.0 as i64).collect::<Vec<_>>())),
))
.execute(conn)
@@ -1530,20 +1538,123 @@ impl Persistence {
0 => {
// TODO(diko): It makes sense to select the current generation
// and include it in the error message for better debuggability.
Err(DatabaseError::Cas(
return Err(DatabaseError::Cas(
"Failed to update membership configuration".to_string(),
))
));
}
1 => {}
_ => {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
)));
}
};
for req in reconcile_requests {
let inserted_updated = diesel::insert_into(stpo::table)
.values(req)
.on_conflict((stpo::tenant_id, stpo::timeline_id, stpo::sk_id))
.do_update()
.set(req)
.filter(stpo::generation.lt(req.generation))
.execute(conn)
.await?;
if inserted_updated > 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({inserted_updated})"
)));
}
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
))),
}
Ok(())
})
})
.await
}
/// Update the cplane notified generation for a timeline.
/// Perform a compare-and-swap (CAS) operation on the timeline's cplane notified generation.
/// The update will fail if the specified generation is less than the cplane notified generation
/// in the database.
pub(crate) async fn update_cplane_notified_generation(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
generation: SafekeeperGeneration,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(
DatabaseOperation::UpdateCplaneNotifiedGeneration,
move |conn| {
Box::pin(async move {
let updated = diesel::update(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::cplane_notified_generation.le(generation.into_inner() as i32))
.set(dsl::cplane_notified_generation.eq(generation.into_inner() as i32))
.execute(conn)
.await?;
match updated {
0 => Err(DatabaseError::Cas(
"Failed to update cplane notified generation".to_string(),
)),
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
))),
}
})
},
)
.await
}
/// Update the sk set notified generation for a timeline.
/// Perform a compare-and-swap (CAS) operation on the timeline's sk set notified generation.
/// The update will fail if the specified generation is less than the sk set notified generation
/// in the database.
pub(crate) async fn update_sk_set_notified_generation(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
generation: SafekeeperGeneration,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(
DatabaseOperation::UpdateSkSetNotifiedGeneration,
move |conn| {
Box::pin(async move {
let updated = diesel::update(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::sk_set_notified_generation.le(generation.into_inner() as i32))
.set(dsl::sk_set_notified_generation.eq(generation.into_inner() as i32))
.execute(conn)
.await?;
match updated {
0 => Err(DatabaseError::Cas(
"Failed to update sk set notified generation".to_string(),
)),
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({updated})"
))),
}
})
},
)
.await
}
/// Load timeline from db. Returns `None` if not present.
pub(crate) async fn get_timeline(
&self,
@@ -2493,6 +2604,7 @@ pub(crate) struct TimelinePersistence {
pub(crate) new_sk_set: Option<Vec<i64>>,
pub(crate) cplane_notified_generation: i32,
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
pub(crate) sk_set_notified_generation: i32,
}
/// This is separate from [TimelinePersistence] only because postgres allows NULLs
@@ -2511,6 +2623,7 @@ pub(crate) struct TimelineFromDb {
pub(crate) new_sk_set: Option<Vec<Option<i64>>>,
pub(crate) cplane_notified_generation: i32,
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
pub(crate) sk_set_notified_generation: i32,
}
impl TimelineFromDb {
@@ -2530,6 +2643,7 @@ impl TimelineFromDb {
new_sk_set,
cplane_notified_generation: self.cplane_notified_generation,
deleted_at: self.deleted_at,
sk_set_notified_generation: self.sk_set_notified_generation,
}
}
}

View File

@@ -110,7 +110,7 @@ impl Safekeeper {
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
cancel_new_retries: &CancellationToken,
) -> mgmt_api::Result<T>
where
O: FnMut(SafekeeperClient) -> F,
@@ -161,7 +161,7 @@ impl Safekeeper {
self.id,
self.base_url(),
),
cancel,
cancel_new_retries,
)
.await
.unwrap_or(Err(mgmt_api::Error::Cancelled))

View File

@@ -118,6 +118,7 @@ diesel::table! {
new_sk_set -> Nullable<Array<Nullable<Int8>>>,
cplane_notified_generation -> Int4,
deleted_at -> Nullable<Timestamptz>,
sk_set_notified_generation -> Int4,
}
}

View File

@@ -364,7 +364,12 @@ impl SafekeeperReconcilerInner {
http_hosts,
tenant_id: req.tenant_id,
timeline_id,
ignore_tombstone: Some(false),
// TODO(diko): get mconf from "timelines" table and pass it here.
// Now we use pull_timeline reconciliation only for the timeline creation,
// so it's not critical right now.
// It could be fixed together with other reconciliation issues:
// https://github.com/neondatabase/neon/issues/12189
mconf: None,
};
success = self
.reconcile_inner(

View File

@@ -123,12 +123,17 @@ impl Service {
/// Perform an operation on a list of safekeepers in parallel with retries.
///
/// If desired_success_count is set, the remaining operations will be cancelled
/// when the desired number of successful responses is reached.
///
/// Return the results of the operation on each safekeeper in the input order.
async fn tenant_timeline_safekeeper_op<T, O, F>(
&self,
safekeepers: &[Safekeeper],
op: O,
max_retries: u32,
timeout: Duration,
desired_success_count: Option<usize>,
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
where
O: FnMut(SafekeeperClient) -> F + Send + 'static,
@@ -136,6 +141,7 @@ impl Service {
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
T: Sync + Send + 'static,
{
let warn_threshold = std::cmp::min(3, max_retries);
let jwt = self
.config
.safekeeper_jwt_token
@@ -143,23 +149,26 @@ impl Service {
.map(SecretString::from);
let mut joinset = JoinSet::new();
let cancel_new_retries = CancellationToken::new();
for (idx, sk) in safekeepers.iter().enumerate() {
let sk = sk.clone();
let http_client = self.http_client.clone();
let jwt = jwt.clone();
let op = op.clone();
let cancel_new_retries = cancel_new_retries.clone();
joinset.spawn(async move {
let res = sk
.with_client_retries(
op,
&http_client,
&jwt,
3,
3,
warn_threshold,
max_retries,
// TODO(diko): This is a wrong timeout.
// It should be scaled to the retry count.
timeout,
&CancellationToken::new(),
&cancel_new_retries,
)
.await;
(idx, res)
@@ -184,6 +193,7 @@ impl Service {
// Wait until all tasks finish or timeout is hit, whichever occurs
// first.
let mut result_count = 0;
let mut success_count = 0;
loop {
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
{
@@ -198,6 +208,15 @@ impl Service {
// Only print errors, as there is no Debug trait for T.
res.as_ref().map(|_| ()),
);
if res.is_ok() {
success_count += 1;
if desired_success_count == Some(success_count) {
// We reached the desired number of successful responses, cancel new retries for
// the remaining safekeepers.
// It does not cancel already started requests, we will still wait for them.
cancel_new_retries.cancel();
}
}
results[idx] = res;
result_count += 1;
}
@@ -247,14 +266,14 @@ impl Service {
);
}
let quorum_size = target_sk_count / 2 + 1;
let max_retries = 3;
let results = self
.tenant_timeline_safekeeper_op(safekeepers, op, timeout)
.tenant_timeline_safekeeper_op(safekeepers, op, max_retries, timeout, Some(quorum_size))
.await?;
// Now check if quorum was reached in results.
let quorum_size = target_sk_count / 2 + 1;
let success_count = results.iter().filter(|res| res.is_ok()).count();
if success_count < quorum_size {
// Failure
@@ -312,6 +331,7 @@ impl Service {
new_sk_set: None,
cplane_notified_generation: 0,
deleted_at: None,
sk_set_notified_generation: 0,
};
let inserted = self
.persistence
@@ -461,6 +481,7 @@ impl Service {
new_sk_set: None,
cplane_notified_generation: 1,
deleted_at: None,
sk_set_notified_generation: 1,
};
let inserted = self
.persistence
@@ -894,17 +915,21 @@ impl Service {
/// If min_position is not None, validates that majority of safekeepers
/// reached at least min_position.
///
/// If update_notified_generation is set, also updates sk_set_notified_generation
/// in the timelines table.
///
/// Return responses from safekeepers in the input order.
async fn tenant_timeline_set_membership_quorum(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: &[Safekeeper],
config: &membership::Configuration,
mconf: &membership::Configuration,
min_position: Option<(Term, Lsn)>,
update_notified_generation: bool,
) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
let req = TimelineMembershipSwitchRequest {
mconf: config.clone(),
mconf: mconf.clone(),
};
const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -945,28 +970,34 @@ impl Service {
.await?;
for res in results.iter().flatten() {
if res.current_conf.generation > config.generation {
if res.current_conf.generation > mconf.generation {
// Antoher switch_membership raced us.
return Err(ApiError::Conflict(format!(
"received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation, config.generation
res.current_conf.generation, mconf.generation
)));
} else if res.current_conf.generation < config.generation {
} else if res.current_conf.generation < mconf.generation {
// Note: should never happen.
// If we get a response, it should be at least the sent generation.
tracing::error!(
"received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation,
config.generation
mconf.generation
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"received configuration with generation {} from safekeeper, but expected {}",
res.current_conf.generation,
config.generation
mconf.generation
)));
}
}
if update_notified_generation {
self.persistence
.update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation)
.await?;
}
Ok(results)
}
@@ -979,6 +1010,7 @@ impl Service {
timeline_id: TimelineId,
to_safekeepers: &[Safekeeper],
from_safekeepers: &[Safekeeper],
mconf: membership::Configuration,
) -> Result<(), ApiError> {
let http_hosts = from_safekeepers
.iter()
@@ -997,17 +1029,15 @@ impl Service {
.collect::<Vec<_>>()
);
// TODO(diko): need to pass mconf/generation with the request
// to properly handle tombstones. Ignore tombstones for now.
// Worst case: we leave a timeline on a safekeeper which is not in the current set.
let req = PullTimelineRequest {
tenant_id,
timeline_id,
http_hosts,
ignore_tombstone: Some(true),
mconf: Some(mconf),
};
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let max_retries = 3;
let responses = self
.tenant_timeline_safekeeper_op(
@@ -1016,7 +1046,9 @@ impl Service {
let req = req.clone();
async move { client.pull_timeline(&req).await }
},
max_retries,
SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
None,
)
.await?;
@@ -1035,20 +1067,28 @@ impl Service {
}
/// Exclude a timeline from safekeepers in parallel with retries.
/// If an exclude request is unsuccessful, it will be added to
/// the reconciler, and after that the function will succeed.
async fn tenant_timeline_safekeeper_exclude(
///
/// Assumes that the exclude requests are already persistent in the database.
///
/// The function does best effort: if an exclude request is unsuccessful,
/// it will be added to the in-memory reconciler, and the function will succeed anyway.
///
/// Might fail if there is error accessing the database.
async fn tenant_timeline_safekeeper_exclude_reconcile(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: &[Safekeeper],
config: &membership::Configuration,
mconf: &membership::Configuration,
) -> Result<(), ApiError> {
let req = TimelineMembershipSwitchRequest {
mconf: config.clone(),
mconf: mconf.clone(),
};
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
// Do not retry failed requests to speed up the finishing phase.
// They will be retried in the reconciler.
let max_retries = 0;
let results = self
.tenant_timeline_safekeeper_op(
@@ -1057,31 +1097,40 @@ impl Service {
let req = req.clone();
async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
},
max_retries,
SK_EXCLUDE_TIMELINE_TIMEOUT,
None,
)
.await?;
let mut reconcile_requests = Vec::new();
for (idx, res) in results.iter().enumerate() {
if res.is_err() {
let sk_id = safekeepers[idx].skp.id;
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: config.generation.into_inner() as i32,
op_kind: SafekeeperTimelineOpKind::Exclude,
sk_id,
};
tracing::info!("writing pending exclude op for sk id {sk_id}");
self.persistence.insert_pending_op(pending_op).await?;
fail::fail_point!("sk-migration-step-9-mid-exclude", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-step-9-mid-exclude"
)))
});
for (idx, res) in results.iter().enumerate() {
let sk_id = safekeepers[idx].skp.id;
let generation = mconf.generation.into_inner();
if res.is_ok() {
self.persistence
.remove_pending_op(
tenant_id,
Some(timeline_id),
NodeId(sk_id as u64),
generation,
)
.await?;
} else {
let req = ScheduleRequest {
safekeeper: Box::new(safekeepers[idx].clone()),
host_list: Vec::new(),
tenant_id,
timeline_id: Some(timeline_id),
generation: config.generation.into_inner(),
generation,
kind: SafekeeperTimelineOpKind::Exclude,
};
reconcile_requests.push(req);
@@ -1208,6 +1257,22 @@ impl Service {
}
// It it is the same new_sk_set, we can continue the migration (retry).
} else {
let prev_finished = timeline.cplane_notified_generation == timeline.generation
&& timeline.sk_set_notified_generation == timeline.generation;
if !prev_finished {
// The previous migration is committed, but the finish step failed.
// Safekeepers/cplane might not know about the last membership configuration.
// Retry the finish step to ensure smooth migration.
self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
.await?;
}
if cur_sk_set == new_sk_set {
tracing::info!("timeline is already at the desired safekeeper set");
return Ok(());
}
// 3. No active migration yet.
// Increment current generation and put desired_set to new_sk_set.
generation = generation.next();
@@ -1219,8 +1284,15 @@ impl Service {
generation,
&cur_sk_set,
Some(&new_sk_set),
&[],
)
.await?;
fail::fail_point!("sk-migration-after-step-3", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-3"
)))
});
}
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
@@ -1249,6 +1321,7 @@ impl Service {
&cur_safekeepers,
&joint_config,
None, // no min position
true, // update notified generation
)
.await?;
@@ -1266,6 +1339,12 @@ impl Service {
"safekeepers set membership updated",
);
fail::fail_point!("sk-migration-after-step-4", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-4"
)))
});
// 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
// by doing pull_timeline from the majority of the current set.
@@ -1282,9 +1361,16 @@ impl Service {
timeline_id,
&pull_to_safekeepers,
&cur_safekeepers,
joint_config.clone(),
)
.await?;
fail::fail_point!("sk-migration-after-step-5", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-5"
)))
});
// 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
// TODO(diko): do we need to bump timeline term?
@@ -1300,9 +1386,16 @@ impl Service {
&new_safekeepers,
&joint_config,
Some(sync_position),
false, // we're just waiting for sync position, don't update notified generation
)
.await?;
fail::fail_point!("sk-migration-after-step-7", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-7"
)))
});
// 8. Create new_conf: Configuration incrementing joint_conf generation and
// having new safekeeper set as sk_set and None new_sk_set.
@@ -1314,45 +1407,55 @@ impl Service {
new_members: None,
};
self.persistence
.update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None)
.await?;
// TODO(diko): at this point we have already updated the timeline in the database,
// but we still need to notify safekeepers and cplane about the new configuration,
// and put delition of the timeline from the old safekeepers into the reconciler.
// Ideally it should be done atomically, but now it's not.
// Worst case: the timeline is not deleted from old safekeepers,
// the compute may require both quorums till the migration is retried and completed.
self.tenant_timeline_set_membership_quorum(
tenant_id,
timeline_id,
&new_safekeepers,
&new_conf,
None, // no min position
)
.await?;
let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
let exclude_safekeepers = cur_safekeepers
.into_iter()
.filter(|sk| !new_ids.contains(&sk.get_id()))
.collect::<Vec<_>>();
self.tenant_timeline_safekeeper_exclude(
let exclude_requests = exclude_safekeepers
.iter()
.map(|sk| TimelinePendingOpPersistence {
sk_id: sk.skp.id,
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: generation.into_inner() as i32,
op_kind: SafekeeperTimelineOpKind::Exclude,
})
.collect::<Vec<_>>();
self.persistence
.update_timeline_membership(
tenant_id,
timeline_id,
generation,
&new_sk_set,
None,
&exclude_requests,
)
.await?;
fail::fail_point!("sk-migration-after-step-8", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-8"
)))
});
// At this point we have already updated the timeline in the database, so the final
// membership configuration is commited and the migration is not abortable anymore.
// But safekeepers and cplane/compute still need to be notified about the new configuration.
// The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about
// the new configuration and reconciles excluded safekeepers.
// If it fails, the safkeeper migration call should be retried.
self.finish_safekeeper_migration(
tenant_id,
timeline_id,
&exclude_safekeepers,
&new_safekeepers,
&new_conf,
&exclude_safekeepers,
)
.await?;
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf)
.await?;
Ok(())
}
@@ -1396,6 +1499,130 @@ impl Service {
ApiError::InternalServerError(anyhow::anyhow!(
"failed to notify cplane about safekeeper membership change: {err}"
))
})
})?;
self.persistence
.update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation)
.await?;
Ok(())
}
/// Finish safekeeper migration.
///
/// It is the last step of the safekeeper migration.
///
/// Notifies safekeepers and cplane about the final membership configuration,
/// reconciles excluded safekeepers and updates *_notified_generation in the database.
async fn finish_safekeeper_migration(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
new_safekeepers: &[Safekeeper],
new_conf: &membership::Configuration,
exclude_safekeepers: &[Safekeeper],
) -> Result<(), ApiError> {
// 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf.
// Also try to exclude safekeepers and notify cplane about the membership change.
self.tenant_timeline_set_membership_quorum(
tenant_id,
timeline_id,
new_safekeepers,
new_conf,
None, // no min position
true, // update notified generation
)
.await?;
fail::fail_point!("sk-migration-step-9-after-set-membership", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-step-9-after-set-membership"
)))
});
self.tenant_timeline_safekeeper_exclude_reconcile(
tenant_id,
timeline_id,
exclude_safekeepers,
new_conf,
)
.await?;
fail::fail_point!("sk-migration-step-9-after-exclude", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-step-9-after-exclude"
)))
});
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
// collect a quorum from them.
self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf)
.await?;
fail::fail_point!("sk-migration-after-step-9", |_| {
Err(ApiError::BadRequest(anyhow::anyhow!(
"failpoint sk-migration-after-step-9"
)))
});
Ok(())
}
/// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database.
/// It's used when the migration failed during the finish step and we need to retry it.
async fn finish_safekeeper_migration_retry(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline: &TimelinePersistence,
) -> Result<(), ApiError> {
if timeline.new_sk_set.is_some() {
// Logical error, should never happen.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None"
)));
}
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
let cur_sk_member_set =
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
let mconf = membership::Configuration {
generation: SafekeeperGeneration::new(timeline.generation as u32),
members: cur_sk_member_set,
new_members: None,
};
// We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler.
// Reload them from the database.
let pending_ops = self
.persistence
.list_pending_ops_for_timeline(tenant_id, timeline_id)
.await?;
let mut exclude_sk_ids = Vec::new();
for op in pending_ops {
if op.op_kind == SafekeeperTimelineOpKind::Exclude
&& op.generation == timeline.generation
{
exclude_sk_ids.push(op.sk_id);
}
}
let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?;
self.finish_safekeeper_migration(
tenant_id,
timeline_id,
&cur_safekeepers,
&mconf,
&exclude_safekeepers,
)
.await?;
Ok(())
}
}

View File

@@ -24,6 +24,7 @@ def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]:
# Some API calls not yet implemented.
# You may want to copy not-yet-implemented methods from the PR https://github.com/neondatabase/neon/pull/11305
@final
class NeonAPI:
def __init__(self, neon_api_key: str, neon_api_base_url: str):
self.__neon_api_key = neon_api_key
@@ -170,7 +171,7 @@ class NeonAPI:
protected: bool | None = None,
archived: bool | None = None,
init_source: str | None = None,
add_endpoint=True,
add_endpoint: bool = True,
) -> dict[str, Any]:
data: dict[str, Any] = {}
if add_endpoint:
@@ -226,6 +227,16 @@ class NeonAPI:
)
return cast("dict[str, Any]", resp.json())
def reset_to_parent(self, project_id: str, branch_id: str) -> dict[str, Any]:
resp = self.__request(
"POST",
f"/projects/{project_id}/branches/{branch_id}/reset_to_parent",
headers={
"Accept": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def restore_branch(
self,
project_id: str,

View File

@@ -1540,6 +1540,17 @@ class NeonEnv:
raise RuntimeError(f"Pageserver with ID {id} not found")
def get_safekeeper(self, id: int) -> Safekeeper:
"""
Look up a safekeeper by its ID.
"""
for sk in self.safekeepers:
if sk.id == id:
return sk
raise RuntimeError(f"Safekeeper with ID {id} not found")
def get_tenant_pageserver(self, tenant_id: TenantId | TenantShardId):
"""
Get the NeonPageserver where this tenant shard is currently attached, according
@@ -5391,15 +5402,24 @@ class Safekeeper(LogUtils):
return timeline_status.commit_lsn
def pull_timeline(
self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId
self,
srcs: list[Safekeeper],
tenant_id: TenantId,
timeline_id: TimelineId,
mconf: MembershipConfiguration | None = None,
) -> dict[str, Any]:
"""
pull_timeline from srcs to self.
"""
src_https = [f"http://localhost:{sk.port.http}" for sk in srcs]
res = self.http_client().pull_timeline(
{"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https}
)
body: dict[str, Any] = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"http_hosts": src_https,
}
if mconf is not None:
body["mconf"] = mconf.__dict__
res = self.http_client().pull_timeline(body)
src_ids = [sk.id for sk in srcs]
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
return res

View File

@@ -152,6 +152,8 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*reconciler.*neon_local error.*",
# Tenant rate limits may fire in tests that submit lots of API requests.
".*tenant \\S+ is rate limited.*",
# Reconciliations may get stuck/delayed e.g. in chaos tests.
".*background_reconcile: Shard reconciliation is stuck.*",
]

View File

@@ -96,6 +96,11 @@ class NeonBranch:
)
self.benchmark: subprocess.Popen[Any] | None = None
self.updated_at: datetime = datetime.fromisoformat(branch["branch"]["updated_at"])
self.parent_timestamp: datetime = (
datetime.fromisoformat(branch["branch"]["parent_timestamp"])
if "parent_timestamp" in branch["branch"]
else datetime.fromtimestamp(0, tz=UTC)
)
self.connect_env: dict[str, str] | None = None
if self.connection_parameters:
self.connect_env = {
@@ -113,8 +118,18 @@ class NeonBranch:
"""
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
def create_child_branch(self) -> NeonBranch | None:
return self.project.create_branch(self.id)
def random_time(self) -> datetime:
min_time = max(
self.updated_at + timedelta(seconds=1),
self.project.min_time,
self.parent_timestamp + timedelta(seconds=1),
)
max_time = datetime.now(UTC) - timedelta(seconds=1)
log.info("min_time: %s, max_time: %s", min_time, max_time)
return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp)
def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints():
@@ -136,21 +151,33 @@ class NeonBranch:
def terminate_benchmark(self) -> None:
self.project.terminate_benchmark(self.id)
def reset_to_parent(self) -> None:
for ep in self.project.endpoints.values():
if ep.type == "read_only":
ep.terminate_benchmark()
self.terminate_benchmark()
res = self.neon_api.reset_to_parent(self.project_id, self.id)
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
self.parent_timestamp = datetime.fromisoformat(res["branch"]["parent_timestamp"])
self.project.wait()
self.start_benchmark()
for ep in self.project.endpoints.values():
if ep.type == "read_only":
ep.start_benchmark()
def restore_random_time(self) -> None:
"""
Does PITR, i.e. calls the reset API call on the same branch to the random time in the past
"""
min_time = self.updated_at + timedelta(seconds=1)
max_time = datetime.now(UTC) - timedelta(seconds=1)
target_time = (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
res = self.restore(
self.id,
source_timestamp=target_time.isoformat().replace("+00:00", "Z"),
source_timestamp=self.random_time().isoformat().replace("+00:00", "Z"),
preserve_under_name=self.project.gen_restore_name(),
)
if res is None:
return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
self.parent_timestamp = datetime.fromisoformat(res["branch"]["parent_timestamp"])
parent_id: str = res["branch"]["parent_id"]
# Creates an object for the parent branch
# After the reset operation a new parent branch is created
@@ -225,6 +252,7 @@ class NeonProject:
self.restart_pgbench_on_console_errors: bool = False
self.limits: dict[str, Any] = self.get_limits()["limits"]
self.read_only_endpoints_total: int = 0
self.min_time: datetime = datetime.now(UTC)
def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id)
@@ -251,11 +279,20 @@ class NeonProject:
)
return False
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
def create_branch(
self, parent_id: str | None = None, parent_timestamp: datetime | None = None
) -> NeonBranch | None:
self.wait()
if not self.check_limit_branches():
return None
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
if parent_timestamp:
log.info("Timestamp: %s", parent_timestamp)
parent_timestamp_str: str | None = None
if parent_timestamp:
parent_timestamp_str = parent_timestamp.isoformat().replace("+00:00", "Z")
branch_def = self.neon_api.create_branch(
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
)
new_branch = NeonBranch(self, branch_def)
self.wait()
return new_branch
@@ -288,6 +325,14 @@ class NeonProject:
if parent.id in self.reset_branches:
parent.delete()
def get_random_leaf_branch(self) -> NeonBranch | None:
target: NeonBranch | None = None
if self.leaf_branches:
target = random.choice(list(self.leaf_branches.values()))
else:
log.info("No leaf branches found")
return target
def delete_endpoint(self, endpoint_id: str) -> None:
self.terminate_benchmark(endpoint_id)
self.neon_api.delete_endpoint(self.id, endpoint_id)
@@ -390,24 +435,22 @@ def do_action(project: NeonProject, action: str) -> bool:
Runs the action
"""
log.info("Action: %s", action)
if action == "new_branch":
log.info("Trying to create a new branch")
if action == "new_branch" or action == "new_branch_random_time":
use_random_time: bool = action == "new_branch_random_time"
log.info("Trying to create a new branch %s", "random time" if use_random_time else "")
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
child = parent.create_child_branch()
child = parent.create_child_branch(parent.random_time() if use_random_time else None)
if child is None:
return False
log.info("Created branch %s", child)
child.start_benchmark()
elif action == "delete_branch":
if project.leaf_branches:
target: NeonBranch = random.choice(list(project.leaf_branches.values()))
log.info("Trying to delete branch %s", target)
target.delete()
else:
log.info("Leaf branches not found, skipping")
if (target := project.get_random_leaf_branch()) is None:
return False
log.info("Trying to delete branch %s", target)
target.delete()
elif action == "new_ro_endpoint":
ep = random.choice(
[br for br in project.branches.values() if br.id not in project.reset_branches]
@@ -427,13 +470,15 @@ def do_action(project: NeonProject, action: str) -> bool:
target_ep.delete()
log.info("endpoint %s deleted", target_ep.id)
elif action == "restore_random_time":
if project.leaf_branches:
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
log.info("Restore %s", br)
br.restore_random_time()
else:
log.info("No leaf branches found")
if (target := project.get_random_leaf_branch()) is None:
return False
log.info("Restore %s", target)
target.restore_random_time()
elif action == "reset_to_parent":
if (target := project.get_random_leaf_branch()) is None:
return False
log.info("Reset to parent %s", target)
target.reset_to_parent()
else:
raise ValueError(f"The action {action} is unknown")
return True
@@ -460,17 +505,22 @@ def test_api_random(
pg_bin, project = setup_class
# Here we can assign weights
ACTIONS = (
("new_branch", 1.5),
("new_branch", 1.2),
("new_branch_random_time", 0.5),
("new_ro_endpoint", 1.4),
("delete_ro_endpoint", 0.8),
("delete_branch", 1.0),
("restore_random_time", 1.2),
("delete_branch", 1.2),
("restore_random_time", 0.9),
("reset_to_parent", 0.3),
)
if num_ops_env := os.getenv("NUM_OPERATIONS"):
num_operations = int(num_ops_env)
else:
num_operations = 250
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
# To not go to the past where pgbench tables do not exist
time.sleep(1)
project.min_time = datetime.now(UTC)
for _ in range(num_operations):
log.info("Starting action #%s", _ + 1)
while not do_action(

View File

@@ -50,11 +50,15 @@ def test_feature_flag(neon_env_builder: NeonEnvBuilder):
)["result"]
)
env.endpoints.create_start("main") # trigger basebackup
env.pageserver.http_client().force_refresh_feature_flag(env.initial_tenant)
# Check if the properties exist
result = env.pageserver.http_client().evaluate_feature_flag_multivariate(
env.initial_tenant, "test-feature-flag"
)
assert "tenant_remote_size_mb" in result["properties"]
assert "tenant_db_count_max" in result["properties"]
assert "tenant_rel_count_max" in result["properties"]
assert "tenant_id" in result["properties"]

View File

@@ -1,13 +1,25 @@
from __future__ import annotations
import re
from typing import TYPE_CHECKING
import pytest
import requests
from fixtures.log_helper import log
from fixtures.neon_fixtures import StorageControllerApiException
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
# TODO(diko): pageserver spams with various errors during safekeeper migration.
# Fix the code so it handles the migration better.
PAGESERVER_ALLOWED_ERRORS = [
".*Timeline .* was cancelled and cannot be used anymore.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was not found in global map.*",
".*wal receiver task finished with an error.*",
]
def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
"""
@@ -24,16 +36,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
# TODO(diko): pageserver spams with various errors during safekeeper migration.
# Fix the code so it handles the migration better.
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was cancelled and cannot be used anymore.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was not found in global map.*",
".*wal receiver task finished with an error.*",
]
)
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
@@ -42,15 +45,23 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
assert len(mconf["sk_set"]) == 1
assert mconf["generation"] == 1
current_sk = mconf["sk_set"][0]
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)")
expected_gen = 1
for active_sk in range(1, 4):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [active_sk]
)
if active_sk != current_sk:
expected_gen += 2
current_sk = active_sk
other_sks = [sk for sk in range(1, 4) if sk != active_sk]
for sk in other_sks:
@@ -65,9 +76,6 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
# 1 initial generation + 2 migrations on each loop iteration.
expected_gen = 1 + 2 * 3
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == expected_gen
@@ -113,3 +121,198 @@ def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder):
env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned")
expect_fail([sk_set[0], decom_sk], "decomissioned")
def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper migration handles failures well.
Two main conditions are checked:
1. safekeeper migration handler can be retried on different failures.
2. writes do not stuck if sk_set and new_sk_set have a quorum in common.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert len(mconf["sk_set"]) == 3
assert mconf["generation"] == 1
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)")
excluded_sk = mconf["sk_set"][-1]
added_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0]
new_sk_set = mconf["sk_set"][:-1] + [added_sk]
log.info(f"migrating sk set from {mconf['sk_set']} to {new_sk_set}")
failpoints = [
"sk-migration-after-step-3",
"sk-migration-after-step-4",
"sk-migration-after-step-5",
"sk-migration-after-step-7",
"sk-migration-after-step-8",
"sk-migration-step-9-after-set-membership",
"sk-migration-step-9-mid-exclude",
"sk-migration-step-9-after-exclude",
"sk-migration-after-step-9",
]
for i, fp in enumerate(failpoints):
env.storage_controller.configure_failpoints((fp, "return(1)"))
with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set
)
ep.safe_psql(f"INSERT INTO t VALUES ({i})")
env.storage_controller.configure_failpoints((fp, "off"))
# No failpoints, migration should succeed.
env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] is None
assert mconf["sk_set"] == new_sk_set
assert mconf["generation"] == 3
ep.clear_buffers()
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(len(failpoints))]
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith("g#3:")
# Check that we didn't forget to remove the timeline on the excluded safekeeper.
with pytest.raises(requests.exceptions.HTTPError) as exc:
env.safekeepers[excluded_sk - 1].http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
assert exc.value.response.status_code == 404
assert (
f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text
)
def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper respects generations:
1. Check that migration back and forth between two safekeepers works.
2. Check that sk refuses to execute requests with stale generation.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] is None
assert len(mconf["sk_set"]) == 1
cur_sk = mconf["sk_set"][0]
second_sk, third_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk]
cur_gen = 1
# Pull the timeline manually to third_sk, so the timeline exists there with stale generation.
# This is needed for the test later.
env.get_safekeeper(third_sk).pull_timeline(
[env.get_safekeeper(cur_sk)], env.initial_tenant, env.initial_timeline
)
def expect_deleted(sk_id: int):
with pytest.raises(requests.exceptions.HTTPError, match="Not Found") as exc:
env.get_safekeeper(sk_id).http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
assert exc.value.response.status_code == 404
assert re.match(r".*timeline .* deleted.*", exc.value.response.text)
def get_mconf(sk_id: int):
status = (
env.get_safekeeper(sk_id)
.http_client()
.timeline_status(env.initial_tenant, env.initial_timeline)
)
assert status.mconf is not None
return status.mconf
def migrate():
nonlocal cur_sk, second_sk, cur_gen
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [second_sk]
)
cur_sk, second_sk = second_sk, cur_sk
cur_gen += 2
# Migrate the timeline back and forth between cur_sk and second_sk.
for _i in range(3):
migrate()
# Timeline should exist on cur_sk.
assert get_mconf(cur_sk).generation == cur_gen
# Timeline should be deleted on second_sk.
expect_deleted(second_sk)
# Remember current mconf.
mconf = get_mconf(cur_sk)
# Migrate the timeline one more time.
# It increases the generation by 2.
migrate()
# Check that sk refuses to execute the exclude request with the old mconf.
with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc:
env.get_safekeeper(cur_sk).http_client().timeline_exclude(
env.initial_tenant, env.initial_timeline, mconf
)
assert re.match(r".*refused to switch into excluding mconf.*", exc.value.response.text)
# We shouldn't have deleted the timeline.
assert get_mconf(cur_sk).generation == cur_gen
# Check that sk refuses to execute the pull_timeline request with the old mconf.
# Note: we try to pull from third_sk, which has a timeline with stale generation.
# Thus, we bypass some preliminary generation checks and actually test tombstones.
with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc:
env.get_safekeeper(second_sk).pull_timeline(
[env.get_safekeeper(third_sk)], env.initial_tenant, env.initial_timeline, mconf
)
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
# The timeline should remain deleted.
expect_deleted(second_sk)
def test_migrate_from_unavailable_sk(neon_env_builder: NeonEnvBuilder):
"""
Test that we can migrate from an unavailable safekeeper
if the quorum is still alive.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert len(mconf["sk_set"]) == 3
another_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0]
unavailable_sk = mconf["sk_set"][0]
env.get_safekeeper(unavailable_sk).stop()
new_sk_set = mconf["sk_set"][1:] + [another_sk]
env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["sk_set"] == new_sk_set
assert mconf["generation"] == 3

View File

@@ -0,0 +1,9 @@
-- Test unlogged build of SPGIST index (no "Page evicted with zero LSN" error)
create table spgist_point_tbl(id int4, p point);
create index spgist_point_idx on spgist_point_tbl using spgist(p) with (fillfactor = 25);
insert into spgist_point_tbl (id, p) select g, point(g*10, g*10) from generate_series(1, 10000) g;
insert into spgist_point_tbl (id, p) select g, point(g*10, g*10) from generate_series(1, 10000) g;
insert into spgist_point_tbl (id, p) select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
vacuum spgist_point_tbl;
insert into spgist_point_tbl (id, p) select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
checkpoint;

View File

@@ -9,5 +9,6 @@ test: neon-rel-truncate
test: neon-clog
test: neon-test-utils
test: neon-vacuum-full
test: neon-event-triggers
test: neon-subxacts
test: neon-spgist
test: neon-event-triggers

View File

@@ -0,0 +1,10 @@
-- Test unlogged build of SPGIST index (no "Page evicted with zero LSN" error)
create table spgist_point_tbl(id int4, p point);
create index spgist_point_idx on spgist_point_tbl using spgist(p) with (fillfactor = 25);
insert into spgist_point_tbl (id, p) select g, point(g*10, g*10) from generate_series(1, 10000) g;
insert into spgist_point_tbl (id, p) select g, point(g*10, g*10) from generate_series(1, 10000) g;
insert into spgist_point_tbl (id, p) select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
vacuum spgist_point_tbl;
insert into spgist_point_tbl (id, p) select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
checkpoint;

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"ba750903a90dded8098f2f56d0b2a9012e6166af"
"fa1788475e3146cc9c7c6a1b74f48fd296898fcd"
],
"v16": [
"16.9",
"ad2b69b58230290fc44c08fbe0c97981c64f6c7d"
"9b9cb4b3e33347aea8f61e606bb6569979516de5"
],
"v15": [
"15.13",
"e5ee23d99874ea9f5b62f8acc7d076162ae95d6c"
"aaaeff2550d5deba58847f112af9b98fa3a58b00"
],
"v14": [
"14.18",
"4cacada8bde7f6424751a0727a657783c6a1d20b"
"c9f9fdd0113b52c0bd535afdb09d3a543aeee25f"
]
}

View File

@@ -74,7 +74,7 @@ once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
rand = { version = "0.9" }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
@@ -93,6 +93,7 @@ spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
stable_deref_trait = { version = "1" }
subtle = { version = "2" }
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
thiserror = { version = "2" }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
@@ -101,6 +102,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["loggin
tokio-stream = { version = "0.1", features = ["net", "sync"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tonic = { version = "0.13", default-features = false, features = ["codegen", "gzip", "prost", "router", "server", "tls-native-roots", "tls-ring", "zstd"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }