Compare commits

...

76 Commits

Author SHA1 Message Date
Heikki Linnakangas
fcc3acfd7e Add a tracing span to where we open connection to Postgres.
Local testing shows that this can take up to 100 ms, so it'd be nice
to include it explicitly in the trace.
2023-05-08 01:59:43 +03:00
Christian Schwarz
411c71b486 document current tenant attach API semantics (#4151)
We currently return 202 as soon as the tenant is allocated in memory
before we've written out the marker file. So, the /attach API currently
does not have a transactional character. For example, it can happen that
we respond with a 202 and then crash before writing out the marker file.
In such a case, it is important that the client

1. observes the lost attach (by polling tenant status and observing 404)
2. and consequently retries the attach.

It has to do it in this loop until it observes the tenant as "Active" in
the tenant status. If the client doesn't follow this protocol and
instead goes to another pageserver to attach the tenant, we risk a
split-brain situation where both the first and second pageserver write
to the tenant's S3 state.

The improved description highlights the consequences of this behavior
for clients that use the /attach endpoint.

The tenant relocation that is currently being implemented in cloud#4740
implements retries of Attach and it does poll afterwards, but, it polls
`has_in_progress_downloads`.
That is incorrect, as described in the patch body.

The motivation for this write-up is that, in a future PR, we'll extend
the /attach endpoint with an option to provide the tenant config. If we
decide to leave the non-transactional behavior of /attach unmodified, we
will be able to avoid persisting the tenant config. Conversely, if we
decide that the /attach API should become transactional, we'll need to
persist the tenant config in the attach-marker-file before acknowledging
receipt of the /attach operation.

refs https://github.com/neondatabase/cloud/pull/4740
refs https://github.com/neondatabase/neon/issues/2238
refs https://github.com/neondatabase/neon/issues/1555
2023-05-05 19:32:41 +03:00
Alexey Kondratov
dd4fd89dc6 [compute_ctl] Do not initialize last_active on start (#4137)
Our scale-to-zero logic was optimized for short auto-suspend intervals,
e.g. minutes or hours. In this case, if compute was restarted by k8s due
to some reason (OOM, k8s node went down, pod relocation, etc.),
`last_active` got bumped, we start counting auto-suspend timeout again.
It's not a big deal, i.e. we suspend completely idle compute not after 5
minutes, but after 10 minutes or so.

Yet, some clients may want days or even weeks. And chance that compute
could be restarted during this interval is pretty high, but in this case
we could be not able to suspend some computes for weeks.

After this commit, we won't initialize `last_active` on start, so
`/status` could return an unset attribute. This means that there was no
user activity since start. Control-plane should deal with it by taking
`max()` out of all available activity timestamps: `started_at`,
`last_active`, etc.

compute_ctl part of neondatabase/cloud#4853
2023-05-05 11:45:37 +02:00
Alexander Bayandin
653e633c59 test_runner: add --pg-version pytest argument (#4037)
- allows setting Postgres version for testing using --pg-version argument
- fixes tests for the non-default Postgres version.
2023-05-05 02:57:47 +03:00
Alexander Bayandin
291b4f0d41 Update client libs for test_runner/pg_clients to their latest versions (#4092)
Also, use Workaround D for `swift/PostgresClientKitExample`, 
which supports neither SNI nor connections options
2023-05-04 18:22:04 +01:00
Christian Schwarz
88f39c11d4 refactor: the code that builds TenantConfOpt from mgmt API requests (#4152)
- extract code that builds TenantConfOpt from requests into a From<>
impl
- move map_err(ApiError::BadRequest) into callers
2023-05-04 18:10:40 +03:00
Heikki Linnakangas
b627fa71e4 Make read-only replicas explicit in compute spec (#4136)
This builds on top of PR #4058, and supersedes #4018
2023-05-04 17:41:42 +03:00
Christian Schwarz
7dd9553bbb eviction: regression test + distinguish layer write from map insert (#4005)
This patch adds a regression test for the threshold-based layer
eviction.
The test asserts the basic invariant that, if left alone, the residence
statuses will stabilize, with some layers resident and some layers
evicted.
Thereby, we cover both the aspect of last-access-time-threshold-based
eviction, and the "imitate access" hacks that we put in recently.

The aggressive `period` and `threshold` values revealed a subtle bug
which is also fixed in this patch.
The symptom was that, without the Rust changes of this patch, there
would be occasional test failures due to `WARN... unexpectedly
downloading` log messages.
These log messages were caused by the "imitate access" calls of the
eviction task.
But, the whole point of the "imitate access" hack was to prevent
eviction of the layers that we access there.
After some digging, I found the root cause, which is the following race
condition:

1. Compact: Write out an L1 layer from several L0 layers. This records
residence event `LayerCreate` with the current timestamp.
2. Eviction: imitate access logical size calculation. This accesses the
L0 layers because the L1 layer is not yet in the layer map.
3. Compact: Grab layer map lock, add the new L1 to layer map and remove
the L0s, release layer map lock.
4. Eviction: observes the new L1 layer whose only activity timestamp is
the `LayerCreate` event.

The L1 layer had no chance of being accessed until after (3).
So, if enough time passes between (1) and (3), then (4) will observe a
layer with `now-last_activity > threshold` and evict it

The fix is to require the first `record_residence_event` to happen while
we already hold the layer map lock.
The API requires a ref to a `BatchedUpdates` as a witness that we are
inside a layer map lock.
That is not fool-proof, e.g., new call sites for `insert_historic` could
just completely forget to record the residence event.
It would be nice to prevent this at the type level.
In the meantime, we have a rate-limited log messages to warn us, if such
an implementation error sneaks in in the future.

fixes https://github.com/neondatabase/neon/issues/3593
fixes https://github.com/neondatabase/neon/issues/3942

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-04 16:16:48 +02:00
Heikki Linnakangas
b5d64a1e32 Rename field, to match field name in XLogData struct and in rust-postgres (#4149)
The field means the same thing as the `wal_end` field in the XLogData
struct. And in the postgres-protocol crate's corresponding
PrimaryKeepAlive struct, it's also called `wal_end`. Let's be
consistent.

As noted by Arthur at
https://github.com/neondatabase/neon/pull/4144#pullrequestreview-1411031881
2023-05-04 14:41:15 +03:00
Christian Schwarz
f9839a0dd9 import_basebackup_from_tar: don't load local layers twice (#4111)
PR #4104 removed these bits as part of a revert of a larger change.

follow-up to
https://github.com/neondatabase/neon/pull/4104#discussion_r1180444952

---

Let's not merge this before the release.
2023-05-04 09:23:49 +02:00
Arthur Petukhovsky
ce1bbc9fa7 Always send the latest commit_lsn in send_wal (#4150)
When a new connection is established to the safekeeper, the 'end_pos'
field is initially set to Lsn::INVALID (i.e 0/0). If there is no WAL to
send to the client, we send KeepAlive messages with Lsn::INVALID. That
confuses the pageserver: it thinks that safekeeper is lagging very much
behind the tip of the branch, and will reconnect to a different
safekeeper. Then the same thing happens with the new safekeeper, until
some WAL is streamed which sets 'end_pos' to a valid value.

This fix always sets `end_pos` to the most recent `commit_lsn` value.
This is useful to send the latest `commit_lsn` to the receiver, so it
will know how advanced this safekeeper compared to the others.

Fixes https://github.com/neondatabase/neon/issues/3972
Supersedes https://github.com/neondatabase/neon/pull/4144
2023-05-04 00:07:45 +03:00
Alexander Bayandin
b114ef26c2 GitHub Autocomment: add a note if no tests were run (#4109)
- Always (if not cancelled) add a comment to a PR
- Mention in the comment if no tests were run / reports were not
generated.
2023-05-03 15:38:49 +01:00
Arthur Petukhovsky
3ceef7b17a Add more safekeeper and walreceiver metrics (#4142)
Add essential safekeeper and pageserver::walreceiver metrics. Mostly
counters, such as the number of received queries, broker messages,
removed WAL segments, or connection switches events in walreceiver.
Also logs broker push loop duration.
2023-05-03 17:07:41 +03:00
Kirill Bulatov
586e6e55f8 Print WalReceiver context on WAL waiting timeout (#4090)
Closes https://github.com/neondatabase/neon/issues/2106

Before:
```
Extracting base backup to create postgres instance: path=/Users/someonetoignore/work/neon/neon_main/test_output/test_pageserver_lsn_wait_error_safekeeper_stop/repo/endpoints/ep-2/pgdata port=15017

              stderr: command failed: page server 'basebackup' command failed

Caused by:
    0: db error: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50
    1: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50

Stack backtrace:
```

After:
```
Extracting base backup to create postgres instance: path=/Users/someonetoignore/work/neon/neon/test_output/test_pageserver_lsn_wait_error_safekeeper_stop/repo/endpoints/ep-2/pgdata port=15011

              stderr: command failed: page server 'basebackup' command failed

Caused by:
    0: db error: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50, WalReceiver status (update 2023-04-26 14:20:39): streaming WAL from node 12346, commit|streaming Lsn: 0/A2C3F58|0/A2C3F58, safekeeper candidates (id|update_time|commit_lsn): [(12348|14:20:40|0/A2C3F58), (12346|14:20:40|0/A2C3F58), (12347|14:20:40|0/A2C3F58)]
    1: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50, WalReceiver status (update 2023-04-26 14:20:39): streaming WAL from node 12346, commit|streaming Lsn: 0/A2C3F58|0/A2C3F58, safekeeper candidates (id|update_time|commit_lsn): [(12348|14:20:40|0/A2C3F58), (12346|14:20:40|0/A2C3F58), (12347|14:20:40|0/A2C3F58)]

Stack backtrace:
```

As the issue requests, the PR adds the context in logs only, but I think
we should expose the context via HTTP management API similar way — it
should be simple with the new API, but better be done in a separate PR.

Co-authored-by: Kirill Bulatov <kirill@neon.tech>
2023-05-03 16:25:19 +03:00
Anton Chaporgin
db81242f4a add debug to pg-sni-router install (#4143) 2023-05-03 16:14:16 +03:00
dependabot[bot]
39ca7c7c09 Bump flask from 2.1.3 to 2.2.5 (#4138) 2023-05-03 10:40:35 +01:00
Heikki Linnakangas
ecc0cf8cd6 Treat EPIPE as an expected error. (#4141)
If the other end of a TCP connection closes its read end of the socket,
you get an EPIPE when you try to send. I saw that happen in the CI once:


https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4136/release/4869464644/index.html#suites/c19bc2126511ef8cb145cca25c438215/7ec87b016c0b4b50/

```
2023-05-03T07:53:22.394152Z ERROR Task 'serving compute connection task' tenant_id: Some(c204447079e02e7ba8f593cb8bc57e76), timeline_id: Some(b666f26600e6deaa9f43e1aeee5bacb7) exited with error: Postgres connection error

Caused by:
    Broken pipe (os error 32)

Stack backtrace:
   0: pageserver::page_service::page_service_conn_main::{{closure}}
             at /__w/neon/neon/pageserver/src/page_service.rs:282:17
      <core::panic::unwind_safe::AssertUnwindSafe<F> as core::future::future::Future>::poll
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panic/unwind_safe.rs:296:9
      <futures_util::future::future::catch_unwind::CatchUnwind<Fut> as core::future::future::Future>::poll::{{closure}}
             at /__w/neon/neon/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.28/src/future/future/catch_unwind.rs:36:42
      <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panic/unwind_safe.rs:271:9
   ...
```

In the passing, add a comment to explain what the "expected" in the
`is_expected_io_error` function means.
2023-05-03 12:03:13 +03:00
Joonas Koivunen
faebe3177b wal_craft: cleanup to keep editable (#4121)
wal_craft had accumulated some trouble by using `use anyhow::*;`. Fixes
that, removes redundant conversions (never need to convert a Path to
OsStr), especially at the `Process` args.

Originally in #4100 but we merged a later PR instead for the fixes. I
dropped the `postmaster.pid` polling in favor of just having a longer
connect timeout.
2023-05-03 11:11:55 +03:00
Joonas Koivunen
474f69c1c0 fix: omit cancellation logging when panicking (#4125)
noticed while describing `RequestSpan`, this fix will omit the otherwise
logged message about request being cancelled when panicking in the
request handler. this was missed on #4064.
2023-05-03 10:56:49 +03:00
Konstantin Knizhnik
47521693ed Fix file_cache build warnings (#4118)
## Describe your changes

## Issue ticket number and link

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-05-02 22:28:21 +03:00
Heikki Linnakangas
4d55d61807 Store basic endpoint info in endpoint.json file. (#4058)
It's more convenient than parsing the postgresql.conf file.

Extracted from PR #3886. I started working on another patch (to make it
safe to run two "neon_local endpoint create" commands concurrently), and
realized that this change will make that simpler too.
2023-05-02 20:36:11 +03:00
Sergey Melnikov
093fafd6bd Deploy pg-sni-router (#4132) 2023-05-01 17:18:45 +02:00
Shany Pozin
e3ae2661ee Add 2 new sets of safekeepers to us-west2 (#4130)
## Describe your changes
TF output:       
module.safekeeper-us-west-2.aws_instance.this["3"]: Creation complete
after 13s [id=i-089f6b9ef426dff76]
module.safekeeper-us-west-2.aws_instance.this["4"]: Creation complete
after 13s [id=i-0fe6bf912c4710c82]
module.safekeeper-us-west-2.aws_instance.this["5"]: Creation complete
after 13s [id=i-0a83c1c46d2b4e409]
module.safekeeper-us-west-2.aws_instance.this["6"]: Creation complete
after 13s [id=i-0fef5317b8fdc9f8d]
module.safekeeper-us-west-2.aws_instance.this["7"]: Creation complete
after 13s [id=i-0be739190d4289bf9]
module.safekeeper-us-west-2.aws_instance.this["8"]: Creation complete
after 13s [id=i-00e851803669e5cfe]
2023-05-01 14:22:59 +03:00
Anton Chaporgin
7e368f3edf build pg-sni-router binary (#4129)
## Describe your changes
This adds pg-sni-router binary to the build pipeline and neon image.

## Issue ticket number and link

https://github.com/neondatabase/cloud/issues/1461
2023-05-01 12:14:31 +02:00
Joonas Koivunen
138bc028ed fix: quick and dirty panic avoidance on drop path (#4128)
Sentry caught a panic on load testing server related to metric removals:
https://neondatabase.sentry.io/issues/4142396994

Turn the `expect` into logging, but also add logging for each removal,
so we could identify in which cases we do double-remove. The
double-removal (or never adding) cause is not obvious or expected.

Original added in #3837.
2023-05-01 11:54:09 +03:00
Stas Kelvich
d53f81b449 Add one more pageserver to staging 2023-04-30 22:39:34 +03:00
Joonas Koivunen
6f472df0d0 fix: restore not logging ignored io errors as errors (#4120)
the fix is rather indirect due to the accidental applying of too much
`anyhow`: if handle_pagerequests returns a `QueryError` it will now be
bubbled up as-is `QueryError`. `QueryError` allows the inner
`std::io::Error` to be inspected and thus we can filter certain error
kinds which are perfectly normal without a huge log message.

for a very long time (b2f5102) the errors were converted to `anyhow` by
mistake which made this difficult or impossible, even though from the
types it would *appear* that we propagate wrapped `std::io::Error`s and
can filter them.

Fixes #4113, most likely filters some other errors as well.
2023-04-30 14:34:55 +03:00
Rahul Patil
21eb944b5e Staging: Add safekeeper nodes [3-8] to eu-west-1 (#4123) 2023-04-29 15:25:57 +03:00
Arthur Petukhovsky
95244912c5 Override sharded-slab to increase MAX_THREADS (#4122)
Add patch directive to Cargo.toml to use patched version of
sharded-slab:
98d16753ab

Patch changes the MAX_THREADS limit from 4096 to 32768. This is a
temporary workaround for using tracing from many threads in safekeepers
code, until async safekeepers patch is merged to the main.

Note that patch can affect other rust services, not only the safekeeper
binary.
2023-04-29 13:31:04 +03:00
Shany Pozin
2617e70008 Add 4 new Pageservers for retool launch (#4115)
## Describe your changes
Adding 4 new pageserves to us-west

TF apply output:
module.pageserver-us-west-2.aws_instance.this["7"]: Creation complete
after 21s [id=i-02eec9b40617db5bc]
module.pageserver-us-west-2.aws_instance.this["5"]: Creation complete
after 21s [id=i-00ca6417c7bf96820]
module.pageserver-us-west-2.aws_instance.this["4"]: Creation complete
after 21s [id=i-013263dd1c239adcc]
module.pageserver-us-west-2.aws_instance.this["6"]: Creation complete
after 22s [id=i-01cdf7d2bc1433b6a]
2023-04-29 11:42:52 +02:00
Arthur Petukhovsky
8543485e92 Pull clone timeline from peer safekeepers (#4089)
Add HTTP endpoint to initialize safekeeper timeline from peer
safekeepers. This is useful for initializing new safekeeper to replace
failed safekeeper. Not fully "correct" in all cases, but should work in
most.

This code is not suitable for production workloads but can be tested on
staging to get started. New endpoint is separated from usual cases and
should not affect anything if no one explicitly uses a new endpoint. We
can rollback this commit in case of issues.
2023-04-28 14:20:46 +00:00
Joonas Koivunen
ec53c5ca2e revert: "Add check for duplicates of generated image layers" (#4104)
This reverts commit 732acc5.

Reverted PR: #3869

As noted in PR #4094, we do in fact try to insert duplicates to the
layer map, if L0->L1 compaction is interrupted. We do not have a proper
fix for that right now, and we are in a hurry to make a release to
production, so revert the changes related to this to the state that we
have in production currently. We know that we have a bug here, but
better to live with the bug that we've had in production for a long
time, than rush a fix to production without testing it in staging first.

Cc: #4094, #4088
2023-04-28 17:20:18 +03:00
Stas Kelvich
94d612195a bump rust-postgres version, after merging PR in rust-postgres 2023-04-28 17:15:43 +03:00
Stas Kelvich
b1329db495 fix sigterm handling 2023-04-28 17:15:43 +03:00
Stas Kelvich
5bb971d64e fix more python tests 2023-04-28 17:15:43 +03:00
Stas Kelvich
0364f77b9a fix python styling 2023-04-28 17:15:43 +03:00
Stas Kelvich
4ac6a9f089 add backward compatibility to proxy 2023-04-28 17:15:43 +03:00
Stas Kelvich
9486d76b2a Add tests for link auth to compute connection 2023-04-28 17:15:43 +03:00
Stas Kelvich
040f736909 remove changes in main proxy that are now not needed 2023-04-28 17:15:43 +03:00
Stas Kelvich
645e4f6ab9 use TLS in link proxy 2023-04-28 17:15:43 +03:00
Heikki Linnakangas
e947cc119b Add a small test case for pg_sni_router 2023-04-28 17:15:43 +03:00
Heikki Linnakangas
53e5d18da5 Start passthrough earlier
As soon as we have received the SSLRequest packet, and have figured
out the hostname to connect to from the SNI, we can start passing
through data. We don't need to parse the StartupPacket that the client
will send next.
2023-04-28 17:15:43 +03:00
Heikki Linnakangas
3813c703c9 Add an option for destination port.
Makes it easier to test locally.
2023-04-28 17:15:43 +03:00
Heikki Linnakangas
b15204fa8c Fix --help, and required args 2023-04-28 17:15:43 +03:00
Alexey Kondratov
81c75586ab Take port from SNI, formatting, make clippy happy 2023-04-28 17:15:43 +03:00
Anton Chaporgin
556fb1642a fixed the way hostname is parsed 2023-04-28 17:15:43 +03:00
Stas Kelvich
23aca81943 Add SNI-based proxy router
In order to not to create NodePorts for each compute we can setup
services that accept connections on wildcard domains and then use
information from domain name to route connection to some internal
service. There are ready solutions for HTTPS and TLS connections
but postgresql protocol uses opportunistic TLS and we haven't found
any ready solutions.

This patch introduces `pg_sni_router` which routes connections to
`aaa--bbb--123.external.domain` to `aaa.bbb.123.internal.domain`.

In the long run we can avoid console -> compute psql communications,
but now this router seems to be the easier way forward.
2023-04-28 17:15:43 +03:00
Arseny Sher
42798e6adc Increase connection_timeout to PG in find end of WAL test.
And log postgres to stdout.

Probably fixes https://github.com/neondatabase/neon/issues/3778
2023-04-28 16:17:23 +04:00
Arthur Petukhovsky
b03143dfc8 Use serde_as DisplayFromStr everywhere (#4103)
We used `display_serialize` previously, but it works only for Serialize.
`DisplayFromStr` does the same, but also works for Deserialize.
2023-04-28 13:55:07 +03:00
Arseny Sher
fdacfaabfd Move PageserverFeedback to utils.
It allows to replace u64 with proper Lsn and pretty print PageserverFeedback
with serde(_json). Now walsenders on safekeepers queried with debug_dump look
like

"walsenders": [
  {
    "ttid": "fafe0cf39a99c608c872706149de9d2a/b4fb3be6f576935e7f0fcb84bdb909a1",
    "addr": "127.0.0.1:48774",
    "conn_id": 3,
    "appname": "pageserver",
    "feedback": {
      "Pageserver": {
	"current_timeline_size": 32096256,
	"last_received_lsn": "0/2415298",
	"disk_consistent_lsn": "0/1696628",
	"remote_consistent_lsn": "0/0",
	"replytime": "2023-04-12T13:54:53.958856+00:00"
      }
    }
  }
],
2023-04-28 06:22:13 +04:00
Arseny Sher
b2a3981ead Move tracking of walsenders out of Timeline.
Refactors walsenders out of timeline.rs to makes it less convoluted into
separate WalSenders with its own lock, but otherwise having the same structure.
Tracking of in-memory remote_consistent_lsn is also moved there as it is mainly
received from pageserver.

State of walsender (feedback) is also restructured to be cleaner; now it is
either PageserverFeedback or StandbyFeedback(StandbyReply, HotStandbyFeedback),
but not both.
2023-04-28 06:22:13 +04:00
Joonas Koivunen
fe0b616299 feat(page_service): read timeouts (#4093)
Introduce read timeouts to our `page_service` connections. Without read
timeouts, we essentially leak connections.

This is a port of #3995. Split the refactorings to the other PR: #4097.

Fixes #4028.
2023-04-27 17:55:35 +00:00
Alexander Bayandin
c4e1cafb63 scripts/flaky_tests.py: handle connection error (#4096)
- Increase `connect_timeout` to 30s, which should be enough for 
most of the cases
- If the script cannot connect to the DB (or any other
`psycopg2.OperationalError` occur) — do not fail the script, log
the error and proceed. Problems with fetching flaky tests shouldn't
block the PR
2023-04-27 17:08:00 +01:00
Joonas Koivunen
fdf5e4db5e refactor: Cleanup page service (#4097)
Refactoring part of #4093.

Numerious `Send + Sync` bounds were a distraction, that were not needed
at all. The proper `Bytes` usage and one `"error_message".to_string()`
are just drive-by fixes.

Not using the `PostgresBackendTCP` allows us to start setting read
timeouts (and more). `PostgresBackendTCP` is still used from proxy, so
it cannot be removed.
2023-04-27 18:51:57 +03:00
Heikki Linnakangas
d1e86d65dc Run rustfmt to fix whitespace.
Commit e6ec2400fc introduced some trivial whitespace issues.
2023-04-27 18:45:22 +03:00
Arseny Sher
f5b4697c90 Log session_id when proxy per client task errors out. 2023-04-27 19:08:22 +04:00
Christian Schwarz
3be81dd36b fix clippy --release failure introduced in #4030 (#4095)
PR `build: run clippy for powerset of features (#4077)` brought us a
`clippy --release` pass.

It was merged after #4030, which fails under `clippy --release` with

```
error: static `TENANT_ID_EXTRACTOR` is never used
    --> pageserver/src/tenant/timeline.rs:4270:16
     |
4270 |     pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
     |                ^^^^^^^^^^^^^^^^^^^
     |
     = note: `-D dead-code` implied by `-D warnings`

error: static `TIMELINE_ID_EXTRACTOR` is never used
    --> pageserver/src/tenant/timeline.rs:4276:16
     |
4276 |     pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<
     |                ^^^^^^^^^^^^^^^^^^^^^
```

A merge queue would have prevented this.
2023-04-27 17:07:25 +03:00
MMeent
e6ec2400fc Enable hot standby PostgreSQL replicas.
Notes:
 - This still needs UI support from the Console
 - I've not tuned any GUCs for PostgreSQL to make this work better
 - Safekeeper has gotten a tweak in which WAL is sent and how: It now
sends zero-ed WAL data from the start of the timeline's first segment up to
the first byte of the timeline to be compatible with normal PostgreSQL
WAL streaming.
 - This includes the commits of #3714 

Fixes one part of https://github.com/neondatabase/neon/issues/769

Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
2023-04-27 15:26:44 +02:00
Christian Schwarz
5b911e1f9f build: run clippy for powerset of features (#4077)
This will catch compiler & clippy warnings in all feature combinations.

We should probably use cargo hack for build and test as well, but,
that's quite expensive and would add to overall CI wait times.

obsoletes https://github.com/neondatabase/neon/pull/4073
refs https://github.com/neondatabase/neon/pull/4070
2023-04-27 15:01:27 +03:00
Christian Schwarz
9ea7b5dd38 clean up logging around on-demand downloads (#4030)
- Remove repeated tenant & timeline from span
- Demote logging of the path to debug level
- Log completion at info level, in the same function where we log errors
- distinguish between layer file download success & on-demand download
succeeding as a whole in the log message wording
- Assert that the span contains a tenant id and a timeline id

fixes https://github.com/neondatabase/neon/issues/3945

Before:

```
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{tenant_id=$TENANT_ID timeline_id=$TIMELINE_ID layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: download complete: /storage/pageserver/data/tenants/$TENANT_ID/timelines/$TIMELINE_ID/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{tenant_id=$TENANT_ID timeline_id=$TIMELINE_ID layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: Rebuilt layer map. Did 9 insertions to process a batch of 1 updates.
```

After:

```
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: layer file download finished
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: Rebuilt layer map. Did 9 insertions to process a batch of 1 updates.
  INFO compaction_loop{tenant_id=$TENANT_ID}:compact_timeline{timeline=$TIMELINE_ID}:download_remote_layer{layer=000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000020C8A71-00000000020CAF91}: on-demand download successful
```
2023-04-27 11:54:48 +02:00
Arseny Sher
0112a602e1 Add timeout on proxy -> compute connection establishment.
Otherwise we sit up to default tcp_syn_retries (about 2+ min) before gettings os
error 110 if compute has been migrated to another pod.
2023-04-27 09:50:52 +04:00
Anastasia Lubennikova
92214578af Fix proxy_io_bytes_per_client metric: use branch_id identifier properly. (#4084)
It fixes the miscalculation of the metric for projects that use multiple
branches for the same endpoint.
We were under billing users with such projects. So we need to
communicate the change in Release Notes.
2023-04-26 17:47:54 +03:00
Christian Schwarz
6861259be7 add global metric for unexpected on-demand downloads (#4069)
Until we have toned down the prod logs to zero WARN and ERROR, we want a
dedicated metric for which we can have a dedicated alert.

fixes https://github.com/neondatabase/neon/issues/3924
2023-04-26 15:18:26 +02:00
Sergey Melnikov
11df2ee5d7 Add safekeeper-3.us-east-2.aws.neon.build (#4085) 2023-04-26 14:40:36 +03:00
Arseny Sher
31a3910fd9 Remove wait_for_sk_commit_lsn_to_reach_remote_storage.
It had a couple of inherent races:

1) Even if compute is killed before the call, some more data might still arrive
to safekeepers after commit_lsn on them is polled, advancing it. Then checkpoint
on pageserver might not include this tail, and so upload of expected LSN won't
happen until one more checkpoint.

2) commit_lsn is updated asynchronously -- compute can commit transaction before
communicating commit_lsn to even single safekeeper (sync-safekeepers can be used
to forces the advancement). This makes semantics of
wait_for_sk_commit_lsn_to_reach_remote_storage quite complicated.

Replace it with last_flush_lsn_upload which
1) Learns last flush LSN on compute;
2) Waits for it to arrive to pageserver;
3) Checkpoints it;
4) Waits for the upload.

In some tests this keeps compute alive longer than before, but this doesn't seem
to be important.

There is a chance this fixes https://github.com/neondatabase/neon/issues/3209
2023-04-26 13:46:33 +04:00
Joonas Koivunen
381c8fca4f feat: log how long tenant activation takes (#4080)
Adds just a counter counting up from the creation to the tenant, logged
after activation. Might help guide us with the investigation of #4025.
2023-04-26 12:39:17 +03:00
Joonas Koivunen
4625da3164 build: remove busted sk-1.us-east-2 from staging hosts (#4082)
this should give us complete deployments while a new one is being
brought up.
2023-04-26 09:07:45 +00:00
Joonas Koivunen
850f6b1cb9 refactor: drop pageserver_ondisk_layers (#4071)
I didn't get through #3775 fast enough so we wanted to remove this
metric.

Fixes #3705.
2023-04-26 11:49:29 +03:00
Sergey Melnikov
f19b70b379 Configure extra domain for us-east-1 (#4078) 2023-04-26 09:36:26 +02:00
Sergey Melnikov
9d0cf08d5f Fix new storage-broker deploy for eu-central-1 (#4079) 2023-04-26 10:29:44 +03:00
Alexander Bayandin
2d6fd72177 GitHub Workflows: Fix crane for several registries (#4076)
Follow-up fix after https://github.com/neondatabase/neon/pull/4067

```
+ crane tag neondatabase/vm-compute-node-v14:3064 latest
Error: fetching "neondatabase/vm-compute-node-v14:3064": GET https://index.docker.io/v2/neondatabase/vm-compute-node-v14/manifests/3064: MANIFEST_UNKNOWN: manifest unknown; unknown tag=3064
```

I reverted back the previous approach for promoting images
(login to one registry, save images to local fs, logout and login to
another registry, and push images from local fs). It turns out what
works for one Google project (kaniko), doesn't work for another (crane)
[sigh]
2023-04-25 23:58:59 +01:00
Heikki Linnakangas
8945fbdb31 Enable OpenTelemetry tracing in proxy in staging. (#4065)
Depends on https://github.com/neondatabase/helm-charts/pull/32

Co-authored-by: Lassi Pölönen <lassi.polonen@iki.fi>
2023-04-25 20:45:36 +03:00
Alexander Bayandin
05ac0e2493 Login to ECR and Docker Hub at once (#4067)
- Update kaniko to 1.9.2 (from 1.7.0), problem with reproducible build is fixed
- Login to ECR and Docker Hub at once, so we can push to several
registries, it makes job `push-docker-hub` unneeded
- `push-docker-hub` replaced with `promote-images` in `needs:` clause,
Pushing images to production ECR moved to `promote-images` job
2023-04-25 17:54:10 +01:00
Joonas Koivunen
bfd45dd671 test_tenant_config: allow ERROR from eviction task (#4074) 2023-04-25 18:41:09 +03:00
Joonas Koivunen
7f80230fd2 fix: stop dead_code rustc lint (#4070)
only happens without `--all-features` which is what `./run_clippy.sh`
uses.
2023-04-25 17:07:04 +02:00
Sergey Melnikov
78bbbccadb Deploy proxies for preview enviroments (#4052)
## Describe your changes
Deploy `main` proxies to the preview environments
We don't deploy storage there yet, as it's tricky.

## Issue ticket number and link
https://github.com/neondatabase/cloud/issues/4737
2023-04-25 16:46:52 +02:00
168 changed files with 5800 additions and 1671 deletions

View File

@@ -41,6 +41,14 @@ storage:
ansible_host: i-051642d372c0a4f32
pageserver-3.us-west-2.aws.neon.tech:
ansible_host: i-00c3844beb9ad1c6b
pageserver-4.us-west-2.aws.neon.tech:
ansible_host: i-013263dd1c239adcc
pageserver-5.us-west-2.aws.neon.tech:
ansible_host: i-00ca6417c7bf96820
pageserver-6.us-west-2.aws.neon.tech:
ansible_host: i-01cdf7d2bc1433b6a
pageserver-7.us-west-2.aws.neon.tech:
ansible_host: i-02eec9b40617db5bc
safekeepers:
hosts:
@@ -50,4 +58,15 @@ storage:
ansible_host: i-074682f9d3c712e7c
safekeeper-2.us-west-2.aws.neon.tech:
ansible_host: i-042b7efb1729d7966
safekeeper-3.us-west-2.aws.neon.tech:
ansible_host: i-089f6b9ef426dff76
safekeeper-4.us-west-2.aws.neon.tech:
ansible_host: i-0fe6bf912c4710c82
safekeeper-5.us-west-2.aws.neon.tech:
ansible_host: i-0a83c1c46d2b4e409
safekeeper-6.us-west-2.aws.neon.tech:
ansible_host: i-0fef5317b8fdc9f8d
safekeeper-7.us-west-2.aws.neon.tech:
ansible_host: i-0be739190d4289bf9
safekeeper-8.us-west-2.aws.neon.tech:
ansible_host: i-00e851803669e5cfe

View File

@@ -0,0 +1,47 @@
storage:
vars:
bucket_name: neon-dev-storage-eu-central-1
bucket_region: eu-central-1
# We only register/update storage in one preview console and manually copy to other instances
console_mgmt_base_url: http://neon-internal-api.helium.aws.neon.build
broker_endpoint: http://storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.helium.aws.neon.build/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: eu-central-1
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-central-1
console_region_id: aws-eu-central-1
sentry_environment: staging
children:
pageservers:
hosts:
pageserver-0.eu-central-1.aws.neon.build:
ansible_host: i-011f93ec26cfba2d4
safekeepers:
hosts:
safekeeper-0.eu-central-1.aws.neon.build:
ansible_host: i-0ff026d27babf8ddd
safekeeper-1.eu-central-1.aws.neon.build:
ansible_host: i-03983a49ee54725d9
safekeeper-2.eu-central-1.aws.neon.build:
ansible_host: i-0bd025ecdb61b0db3

View File

@@ -35,6 +35,8 @@ storage:
hosts:
pageserver-0.eu-west-1.aws.neon.build:
ansible_host: i-01d496c5041c7f34c
pageserver-1.eu-west-1.aws.neon.build:
ansible_host: i-0e8013e239ce3928c
safekeepers:
hosts:
@@ -44,3 +46,15 @@ storage:
ansible_host: i-06969ee1bf2958bfc
safekeeper-2.eu-west-1.aws.neon.build:
ansible_host: i-087892e9625984a0b
safekeeper-3.eu-west-1.aws.neon.build:
ansible_host: i-0a6f91660e99e8891
safekeeper-4.eu-west-1.aws.neon.build:
ansible_host: i-0012e309e28e7c249
safekeeper-5.eu-west-1.aws.neon.build:
ansible_host: i-085a2b1193287b32e
safekeeper-6.eu-west-1.aws.neon.build:
ansible_host: i-0c713248465ed0fbd
safekeeper-7.eu-west-1.aws.neon.build:
ansible_host: i-02ad231aed2a80b7a
safekeeper-8.eu-west-1.aws.neon.build:
ansible_host: i-0dbbd8ffef66efda8

View File

@@ -48,9 +48,9 @@ storage:
hosts:
safekeeper-0.us-east-2.aws.neon.build:
ansible_host: i-027662bd552bf5db0
safekeeper-1.us-east-2.aws.neon.build:
ansible_host: i-0171efc3604a7b907
safekeeper-2.us-east-2.aws.neon.build:
ansible_host: i-0de0b03a51676a6ce
safekeeper-3.us-east-2.aws.neon.build:
ansible_host: i-05f8ba2cda243bd18
safekeeper-99.us-east-2.aws.neon.build:
ansible_host: i-0d61b6a2ea32028d5

View File

@@ -0,0 +1,52 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: staging
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "staging"

View File

@@ -0,0 +1,19 @@
useCertManager: true
replicaCount: 3
exposedService:
# exposedService.port -- Exposed Service proxy port
port: 4432
annotations:
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.alpha.eu-central-1.internal.aws.neon.build"
settings:
domain: "*.snirouter.alpha.eu-central-1.internal.aws.neon.build"
sentryEnvironment: "staging"
imagePullSecrets:
- name: docker-hub-neon
metrics:
enabled: false

View File

@@ -23,6 +23,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.eu-west-1.aws.neon.build"
otelExporterOtlpEndpoint: "https://otel-collector.zeta.eu-west-1.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -0,0 +1,19 @@
useCertManager: true
replicaCount: 3
exposedService:
# exposedService.port -- Exposed Service proxy port
port: 4432
annotations:
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.zeta.eu-west-1.internal.aws.neon.build"
settings:
domain: "*.snirouter.zeta.eu-west-1.internal.aws.neon.build"
sentryEnvironment: "staging"
imagePullSecrets:
- name: docker-hub-neon
metrics:
enabled: false

View File

@@ -9,6 +9,7 @@ settings:
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
uri: "https://console.stage.neon.tech/psql_session/"
domain: "pg.neon.build"
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"

View File

@@ -24,6 +24,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.cloud.stage.neon.tech"
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -25,6 +25,7 @@ settings:
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
extraDomains: ["*.us-east-2.postgres.zenith.tech", "*.us-east-2.retooldb-staging.com"]
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -0,0 +1,19 @@
useCertManager: true
replicaCount: 3
exposedService:
# exposedService.port -- Exposed Service proxy port
port: 4432
annotations:
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.beta.us-east-2.internal.aws.neon.build"
settings:
domain: "*.snirouter.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
imagePullSecrets:
- name: docker-hub-neon
metrics:
enabled: false

View File

@@ -0,0 +1,67 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/management/api/v2"
domain: "*.cloud.${PREVIEW_NAME}.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: test
neon_region: ${PREVIEW_NAME}.eu-central-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: cloud.${PREVIEW_NAME}.aws.neon.build
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -0,0 +1,19 @@
useCertManager: true
replicaCount: 3
exposedService:
# exposedService.port -- Exposed Service proxy port
port: 4432
annotations:
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.epsilon.ap-southeast-1.internal.aws.neon.tech"
settings:
domain: "*.snirouter.epsilon.ap-southeast-1.internal.aws.neon.tech"
sentryEnvironment: "production"
imagePullSecrets:
- name: docker-hub-neon
metrics:
enabled: false

View File

@@ -0,0 +1,19 @@
useCertManager: true
replicaCount: 3
exposedService:
# exposedService.port -- Exposed Service proxy port
port: 4432
annotations:
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.gamma.eu-central-1.internal.aws.neon.tech"
settings:
domain: "*.snirouter.gamma.eu-central-1.internal.aws.neon.tech"
sentryEnvironment: "production"
imagePullSecrets:
- name: docker-hub-neon
metrics:
enabled: false

View File

@@ -23,8 +23,8 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-1.aws.neon.tech"
# These domains haven't been delegated yet.
# extraDomains: ["*.us-east-1.retooldb.com", "*.us-east-1.postgres.vercel-storage.com"]
# *.us-east-1.retooldb.com hasn't been delegated yet.
extraDomains: ["*.us-east-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -0,0 +1,19 @@
useCertManager: true
replicaCount: 3
exposedService:
# exposedService.port -- Exposed Service proxy port
port: 4432
annotations:
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.theta.us-east-1.internal.aws.neon.tech"
settings:
domain: "*.snirouter.theta.us-east-1.internal.aws.neon.tech"
sentryEnvironment: "production"
imagePullSecrets:
- name: docker-hub-neon
metrics:
enabled: false

View File

@@ -0,0 +1,19 @@
useCertManager: true
replicaCount: 3
exposedService:
# exposedService.port -- Exposed Service proxy port
port: 4432
annotations:
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.delta.us-east-2.internal.aws.neon.tech"
settings:
domain: "*.snirouter.delta.us-east-2.internal.aws.neon.tech"
sentryEnvironment: "production"
imagePullSecrets:
- name: docker-hub-neon
metrics:
enabled: false

View File

@@ -0,0 +1,19 @@
useCertManager: true
replicaCount: 3
exposedService:
# exposedService.port -- Exposed Service proxy port
port: 4432
annotations:
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.eta.us-west-2.internal.aws.neon.tech"
settings:
domain: "*.snirouter.eta.us-west-2.internal.aws.neon.tech"
sentryEnvironment: "production"
imagePullSecrets:
- name: docker-hub-neon
metrics:
enabled: false

View File

@@ -111,8 +111,21 @@ jobs:
- name: Get postgres headers
run: make postgres-headers -j$(nproc)
- name: Run cargo clippy
run: ./run_clippy.sh
# cargo hack runs the given cargo subcommand (clippy in this case) for all feature combinations.
# This will catch compiler & clippy warnings in all feature combinations.
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
# NB: keep clippy args in sync with ./run_clippy.sh
- run: |
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
echo "No clippy args found in .neon_clippy_args"
exit 1
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Run cargo clippy (release)
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting
@@ -405,10 +418,7 @@ jobs:
- uses: actions/github-script@v6
if: >
!cancelled() &&
github.event_name == 'pull_request' && (
steps.create-allure-report-debug.outputs.report-url ||
steps.create-allure-report-release.outputs.report-url
)
github.event_name == 'pull_request'
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
@@ -541,7 +551,7 @@ jobs:
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
needs: [ push-docker-hub, tag ]
needs: [ promote-images, tag ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
@@ -584,8 +594,7 @@ jobs:
neon-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
# https://github.com/GoogleContainerTools/kaniko/issues/2005
container: gcr.io/kaniko-project/executor:v1.7.0-debug
container: gcr.io/kaniko-project/executor:v1.9.2-debug
defaults:
run:
shell: sh -eu {0}
@@ -597,11 +606,32 @@ jobs:
submodules: true
fetch-depth: 0
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Kaniko build neon
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
@@ -652,7 +682,7 @@ jobs:
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
container: gcr.io/kaniko-project/executor:v1.7.0-debug
container: gcr.io/kaniko-project/executor:v1.9.2-debug
defaults:
run:
shell: sh -eu {0}
@@ -661,18 +691,41 @@ jobs:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Kaniko build compute tools
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--dockerfile Dockerfile.compute-tools
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
compute-node-image:
runs-on: [ self-hosted, gen3, large ]
container: gcr.io/kaniko-project/executor:v1.7.0-debug
container: gcr.io/kaniko-project/executor:v1.9.2-debug
needs: [ tag ]
strategy:
fail-fast: false
@@ -689,12 +742,36 @@ jobs:
submodules: true
fetch-depth: 0
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Kaniko build compute node with extensions
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --build-arg PG_VERSION=${{ matrix.version }} --dockerfile Dockerfile.compute-node --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg PG_VERSION=${{ matrix.version }}
--dockerfile Dockerfile.compute-node
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
@@ -786,13 +863,11 @@ jobs:
runs-on: [ self-hosted, gen3, small ]
needs: [ tag, test-images, vm-compute-node-image ]
container: golang:1.19-bullseye
if: github.event_name != 'workflow_dispatch'
# Don't add if-condition here.
# The job should always be run because we have dependant other jobs that shouldn't be skipped
steps:
- name: Install Crane & ECR helper
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
@@ -802,10 +877,15 @@ jobs:
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Copy vm-compute-node images to Docker Hub
run: |
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
- name: Add latest tag to images
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
github.event_name != 'workflow_dispatch'
run: |
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
@@ -814,50 +894,10 @@ jobs:
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
push-docker-hub:
runs-on: [ self-hosted, dev, x64 ]
needs: [ promote-images, tag ]
container: golang:1.19-bullseye
steps:
- name: Install Crane & ECR helper
run: |
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
- name: Configure ECR login
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Pull neon image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} neon
- name: Pull compute tools image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} compute-tools
- name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} compute-node-v14
- name: Pull vm compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} compute-node-v15
- name: Pull vm compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
- name: Push images to production ECR
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
github.event_name != 'workflow_dispatch'
run: |
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
@@ -872,28 +912,12 @@ jobs:
echo "" > /github/home/.docker/config.json
crane auth login -u ${{ secrets.NEON_DOCKERHUB_USERNAME }} -p ${{ secrets.NEON_DOCKERHUB_PASSWORD }} index.docker.io
- name: Push neon image to Docker Hub
run: crane push neon neondatabase/neon:${{needs.tag.outputs.build-tag}}
- name: Push vm-compute-node to Docker Hub
run: |
crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push compute tools image to Docker Hub
run: crane push compute-tools neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
- name: Push compute node v14 image to Docker Hub
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push vm compute node v14 image to Docker Hub
run: crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push compute node v15 image to Docker Hub
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push vm compute node v15 image to Docker Hub
run: crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned
- name: Add latest tag to images in Docker Hub
- name: Push latest tags to Docker Hub
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
@@ -913,7 +937,7 @@ jobs:
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, tag, regress-tests ]
needs: [ promote-images, tag, regress-tests ]
if: |
contains(github.event.pull_request.labels.*.name, 'deploy-test-storage') &&
github.event_name != 'workflow_dispatch'
@@ -947,7 +971,7 @@ jobs:
deploy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
needs: [ push-docker-hub, tag, regress-tests ]
needs: [ promote-images, tag, regress-tests ]
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
steps:
- name: Fix git ownership
@@ -984,7 +1008,7 @@ jobs:
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ push-docker-hub, tag, regress-tests ]
needs: [ promote-images, tag, regress-tests ]
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
steps:
- name: Promote compatibility snapshot for the release

View File

@@ -27,6 +27,11 @@ on:
required: true
type: boolean
default: true
deployPgSniRouter:
description: 'Deploy pg-sni-router'
required: true
type: boolean
default: true
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
@@ -48,7 +53,8 @@ jobs:
shell: bash
strategy:
matrix:
target_region: [ eu-west-1, us-east-2 ]
# TODO(sergey): Fix storage deploy in eu-central-1
target_region: [ eu-west-1, us-east-2]
environment:
name: dev-${{ matrix.target_region }}
steps:
@@ -133,6 +139,53 @@ jobs:
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-preview-proxy-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployProxy
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy preview proxies
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
for PREVIEW_NAME in helium argon krypton xenon radon oganesson hydrogen nitrogen oxygen fluorine chlorine; do
export PREVIEW_NAME
envsubst <.github/helm-values/preview-template.neon-proxy-scram.yaml >preview-${PREVIEW_NAME}.neon-proxy-scram.yaml
helm upgrade neon-proxy-scram-${PREVIEW_NAME} neondatabase/neon-proxy --namespace neon-proxy-${PREVIEW_NAME} --create-namespace --install --atomic -f preview-${PREVIEW_NAME}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
done
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker-new:
runs-on: [ self-hosted, gen3, small ]
@@ -148,6 +201,8 @@ jobs:
target_cluster: dev-us-east-2-beta
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:
@@ -177,3 +232,49 @@ jobs:
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-pg-sni-router:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployPgSniRouter
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: dev-us-east-2-beta
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Deploy pg-sni-router
run:
helm upgrade neon-pg-sni-router neondatabase/neon-pg-sni-router --namespace neon-pg-sni-router --create-namespace --install --debug --atomic -f .github/helm-values/${{ matrix.target_cluster }}.pg-sni-router.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 15m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache

View File

@@ -27,6 +27,11 @@ on:
required: true
type: boolean
default: true
deployPgSniRouter:
description: 'Deploy pg-sni-router'
required: true
type: boolean
default: true
disclamerAcknowledged:
description: 'I confirm that there is an emergency and I can not use regular release workflow'
required: true
@@ -171,3 +176,42 @@ jobs:
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
deploy-pg-sni-router:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
if: inputs.deployPgSniRouter && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: prod-us-east-2-delta
- target_region: us-west-2
target_cluster: prod-us-west-2-eta
- target_region: eu-central-1
target_cluster: prod-eu-central-1-gamma
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
- target_region: us-east-1
target_cluster: prod-us-east-1-theta
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Deploy pg-sni-router
run:
helm upgrade neon-pg-sni-router neondatabase/neon-pg-sni-router --namespace neon-pg-sni-router --create-namespace --install --debug --atomic -f .github/helm-values/${{ matrix.target_cluster }}.pg-sni-router.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 15m0s

4
.neon_clippy_args Normal file
View File

@@ -0,0 +1,4 @@
# * `-A unknown_lints` do not warn about unknown lint suppressions
# that people with newer toolchains might use
# * `-D warnings` - fail on any warnings (`cargo` returns non-zero exit status)
export CLIPPY_COMMON_ARGS="--locked --workspace --all-targets -- -A unknown_lints -D warnings"

139
Cargo.lock generated
View File

@@ -1032,6 +1032,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"utils",
"workspace_hack",
]
@@ -1105,6 +1106,7 @@ dependencies = [
"anyhow",
"clap 4.2.2",
"comfy-table",
"compute_api",
"git-version",
"nix",
"once_cell",
@@ -1574,6 +1576,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@@ -2361,6 +2378,24 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "nix"
version = "0.26.2"
@@ -2483,12 +2518,50 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "openssl"
version = "0.10.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.15",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.18.0"
@@ -2681,6 +2754,7 @@ dependencies = [
"tenant_size_model",
"thiserror",
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-tar",
"tokio-util",
@@ -2815,6 +2889,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
[[package]]
name = "plotters"
version = "0.3.4"
@@ -2846,7 +2926,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -2856,10 +2936,21 @@ dependencies = [
"tokio-postgres",
]
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
dependencies = [
"native-tls",
"tokio",
"tokio-native-tls",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -2877,7 +2968,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -2958,7 +3049,6 @@ dependencies = [
"pin-project-lite",
"postgres-protocol",
"rand",
"serde",
"thiserror",
"tokio",
"tracing",
@@ -3109,10 +3199,12 @@ dependencies = [
"itertools",
"md5",
"metrics",
"native-tls",
"once_cell",
"opentelemetry",
"parking_lot",
"pin-project-lite",
"postgres-native-tls",
"postgres_backend",
"pq_proto",
"prometheus",
@@ -3567,6 +3659,7 @@ dependencies = [
"const_format",
"crc32c",
"fs2",
"futures",
"git-version",
"hex",
"humantime",
@@ -3581,7 +3674,9 @@ dependencies = [
"pq_proto",
"regex",
"remote_storage",
"reqwest",
"safekeeper_api",
"scopeguard",
"serde",
"serde_json",
"serde_with",
@@ -3868,8 +3963,7 @@ dependencies = [
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
source = "git+https://github.com/neondatabase/sharded-slab.git?rev=98d16753ab01c61f0a028de44167307a00efea00#98d16753ab01c61f0a028de44167307a00efea00"
dependencies = [
"lazy_static",
]
@@ -4319,10 +4413,20 @@ dependencies = [
"syn 2.0.15",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
dependencies = [
"async-trait",
"byteorder",
@@ -4629,6 +4733,16 @@ dependencies = [
"valuable",
]
[[package]]
name = "tracing-error"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e"
dependencies = [
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@@ -4854,6 +4968,7 @@ dependencies = [
"bincode",
"byteorder",
"bytes",
"chrono",
"criterion",
"futures",
"heapless",
@@ -4865,6 +4980,7 @@ dependencies = [
"nix",
"once_cell",
"pin-project-lite",
"pq_proto",
"rand",
"regex",
"routerify",
@@ -4879,6 +4995,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"tracing-error",
"tracing-subscriber",
"url",
"uuid",
@@ -4901,6 +5018,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@@ -5279,13 +5402,11 @@ name = "workspace_hack"
version = "0.1.0"
dependencies = [
"anyhow",
"byteorder",
"bytes",
"chrono",
"clap 4.2.2",
"clap_builder",
"crossbeam-utils",
"digest",
"either",
"fail",
"futures",

View File

@@ -62,6 +62,7 @@ jsonwebtoken = "8"
libc = "0.2"
md5 = "0.7.0"
memoffset = "0.8"
native-tls = "0.2"
nix = "0.26"
notify = "5.0.0"
num_cpus = "1.15"
@@ -110,6 +111,7 @@ toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2"
@@ -123,10 +125,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
@@ -158,10 +161,16 @@ rstest = "0.17"
tempfile = "3.4"
tonic-build = "0.9"
[patch.crates-io]
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
[patch.crates-io]
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
# Changes the MAX_THREADS limit from 4096 to 32768.
# This is a temporary workaround for using tracing from many threads in safekeepers code,
# until async safekeepers patch is merged to the main.
sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" }
################# Binary contents sections

View File

@@ -44,7 +44,15 @@ COPY --chown=nonroot . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin storage_broker --bin proxy --locked --release \
&& mold -run cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pageserver_binutils \
--bin draw_timeline_dir \
--bin safekeeper \
--bin storage_broker \
--bin proxy \
--locked --release \
&& cachepot -s
# Build final image
@@ -63,6 +71,7 @@ RUN set -e \
&& useradd -d /data neon \
&& chown -R neon:neon /data
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin

View File

@@ -30,7 +30,7 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
use compute_api::spec::{ComputeMode, ComputeSpec};
use crate::config;
use crate::pg_helpers::*;
@@ -67,8 +67,9 @@ pub struct ComputeNode {
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
/// Timestamp of the last Postgres activity. It could be `None` if
/// compute wasn't used since start.
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
pub pspec: Option<ParsedSpec>,
pub metrics: ComputeMetrics,
@@ -79,7 +80,7 @@ impl ComputeState {
Self {
start_time: Utc::now(),
status: ComputeStatus::Empty,
last_active: Utc::now(),
last_active: None,
error: None,
pspec: None,
metrics: ComputeMetrics::default(),
@@ -250,17 +251,34 @@ impl ComputeNode {
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
// Syncing safekeepers is only safe with primary nodes: if a primary
// is already connected it will be kicked out, so a secondary (standby)
// cannot sync safekeepers.
let lsn = match spec.mode {
ComputeMode::Primary => {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
lsn
}
ComputeMode::Static(lsn) => {
info!("Starting read-only node at static LSN {}", lsn);
lsn
}
ComputeMode::Replica => {
info!("Initializing standby from latest Pageserver LSN");
Lsn(0)
}
};
info!(
"getting basebackup@{} from pageserver {}",
@@ -276,6 +294,13 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
match spec.mode {
ComputeMode::Primary | ComputeMode::Static(..) => {}
ComputeMode::Replica => {
add_standby_signal(pgdata_path)?;
}
}
Ok(())
}
@@ -313,27 +338,30 @@ impl ComputeNode {
// In this case we need to connect with old `zenith_admin` name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
Err(e) => {
info!(
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
e
);
let mut zenith_admin_connstr = self.connstr.clone();
let mut client = {
let _span = tracing::info_span!("connect").entered();
match Client::connect(self.connstr.as_str(), NoTls) {
Err(e) => {
info!(
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
e
);
let mut zenith_admin_connstr = self.connstr.clone();
zenith_admin_connstr
.set_username("zenith_admin")
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
zenith_admin_connstr
.set_username("zenith_admin")
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
drop(client);
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
drop(client);
// reconnect with connsting with expected name
Client::connect(self.connstr.as_str(), NoTls)?
// reconnect with connsting with expected name
Client::connect(self.connstr.as_str(), NoTls)?
}
Ok(client) => client,
}
Ok(client) => client,
};
// Proceed with post-startup configuration. Note, that order of operations is important.
@@ -378,11 +406,13 @@ impl ComputeNode {
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
if spec.mode == ComputeMode::Primary {
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
}
// 'Close' connection
drop(client);
@@ -415,7 +445,9 @@ impl ComputeNode {
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
self.apply_config(&compute_state)?;
if spec.spec.mode == ComputeMode::Primary {
self.apply_config(&compute_state)?;
}
let startup_end_time = Utc::now();
{

View File

@@ -6,7 +6,7 @@ use std::path::Path;
use anyhow::Result;
use crate::pg_helpers::PgOptionsSerialize;
use compute_api::spec::ComputeSpec;
use compute_api::spec::{ComputeMode, ComputeSpec};
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
@@ -34,17 +34,25 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
/// Create or completely rewrite configuration file specified by `path`
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
// File::create() destroys the file content if it exists.
let mut postgres_conf = File::create(path)?;
let mut file = File::create(path)?;
write_auto_managed_block(&mut postgres_conf, &spec.cluster.settings.as_pg_settings())?;
Ok(())
}
// Write Postgres config block wrapped with generated comment section
fn write_auto_managed_block(file: &mut File, buf: &str) -> Result<()> {
writeln!(file, "# Managed by compute_ctl: begin")?;
writeln!(file, "{}", buf)?;
write!(file, "{}", &spec.cluster.settings.as_pg_settings())?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;
writeln!(file, "recovery_target_lsn='{lsn}'")?;
}
ComputeMode::Replica => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;
}
}
writeln!(file, "# Managed by compute_ctl: end")?;
Ok(())

View File

@@ -181,8 +181,8 @@ components:
ComputeState:
type: object
required:
- start_time
- status
- last_active
properties:
start_time:
type: string
@@ -195,11 +195,13 @@ components:
$ref: '#/components/schemas/ComputeStatus'
last_active:
type: string
description: The last detected compute activity timestamp in UTC and RFC3339 format.
description: |
The last detected compute activity timestamp in UTC and RFC3339 format.
It could be empty if compute was never used by user since start.
example: "2022-10-12T07:20:50.52Z"
error:
type: string
description: Text of the error during compute startup, if any.
description: Text of the error during compute startup or reconfiguration, if any.
example: ""
tenant:
type: string
@@ -222,9 +224,12 @@ components:
ComputeStatus:
type: string
enum:
- empty
- init
- failed
- running
- configuration_pending
- configuration
example: running
#

View File

@@ -74,7 +74,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
// Found non-idle backend, so the last activity is NOW.
// Save it and exit the for loop. Also clear the idle backend
// `state_change` timestamps array as it doesn't matter now.
last_active = Utc::now();
last_active = Some(Utc::now());
idle_backs.clear();
break;
}
@@ -82,15 +82,16 @@ fn watch_compute_activity(compute: &ComputeNode) {
// Get idle backend `state_change` with the max timestamp.
if let Some(last) = idle_backs.iter().max() {
last_active = *last;
last_active = Some(*last);
}
}
// Update the last activity in the shared state if we got a more recent one.
let mut state = compute.state.lock().unwrap();
// NB: `Some(<DateTime>)` is always greater than `None`.
if last_active > state.last_active {
state.last_active = last_active;
debug!("set the last compute activity time to: {}", last_active);
debug!("set the last compute activity time to: {:?}", last_active);
}
}
Err(e) => {

View File

@@ -94,6 +94,7 @@ impl PgOptionsSerialize for GenericOptions {
pub trait GenericOptionsSearch {
fn find(&self, name: &str) -> Option<String>;
fn find_ref(&self, name: &str) -> Option<&GenericOption>;
}
impl GenericOptionsSearch for GenericOptions {
@@ -103,6 +104,12 @@ impl GenericOptionsSearch for GenericOptions {
let op = ops.iter().find(|s| s.name == name)?;
op.value.clone()
}
/// Lookup option by name, returning ref
fn find_ref(&self, name: &str) -> Option<&GenericOption> {
let ops = self.as_ref()?;
ops.iter().find(|s| s.name == name)
}
}
pub trait RoleExt {

View File

@@ -1,3 +1,4 @@
use std::fs::File;
use std::path::Path;
use std::str::FromStr;
@@ -145,6 +146,21 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
Ok(())
}
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of spec.json
info!("adding standby.signal");
let signalfile = pgdata_path.join("standby.signal");
if !signalfile.exists() {
info!("created standby.signal");
File::create(signalfile)?;
} else {
info!("reused pre-existing standby.signal");
}
Ok(())
}
/// Given a cluster spec json and open transaction it handles roles creation,
/// deletion and update.
#[instrument(skip_all)]

View File

@@ -30,4 +30,5 @@ postgres_connection.workspace = true
storage_broker.workspace = true
utils.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true

View File

@@ -7,6 +7,7 @@
//!
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
@@ -474,7 +475,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
cplane.new_endpoint(
tenant_id,
name,
timeline_id,
None,
pg_version,
ComputeMode::Primary,
)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -560,20 +568,20 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
let lsn_str = match endpoint.lsn {
None => {
// -> primary endpoint
let lsn_str = match endpoint.mode {
ComputeMode::Static(lsn) => {
// -> read-only endpoint
// Use the node's LSN.
lsn.to_string()
}
_ => {
// -> primary endpoint or hot replica
// Use the LSN at the end of the timeline.
timeline_infos
.get(&endpoint.timeline_id)
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {
// -> read-only endpoint
// Use the endpoint's LSN.
lsn.to_string()
}
};
let branch_name = timeline_name_mappings
@@ -619,7 +627,19 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
(None, false) => ComputeMode::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, port, pg_version, mode)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -637,7 +657,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
None
};
let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
if let Some(endpoint) = endpoint {
match (&endpoint.mode, hot_standby) {
(ComputeMode::Static(_), true) => {
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
}
(ComputeMode::Primary, true) => {
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
}
_ => {}
}
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
} else {
@@ -659,6 +693,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<u32>("pg-version")
.copied()
.context("Failed to `pg-version` from the argument string")?;
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
(None, false) => ComputeMode::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
// when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e
// start --port X
@@ -670,9 +712,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
tenant_id,
endpoint_id,
timeline_id,
lsn,
port,
pg_version,
mode,
)?;
ep.start(&auth_token)?;
}
@@ -928,6 +970,12 @@ fn cli() -> Command {
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false);
let hot_standby_arg = Arg::new("hot-standby")
.value_parser(value_parser!(bool))
.long("hot-standby")
.help("If set, the node will be a hot replica on the specified timeline")
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1052,6 +1100,7 @@ fn cli() -> Command {
.long("config-only")
.required(false))
.arg(pg_version_arg.clone())
.arg(hot_standby_arg.clone())
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
@@ -1062,6 +1111,7 @@ fn cli() -> Command {
.arg(lsn_arg)
.arg(port_arg)
.arg(pg_version_arg)
.arg(hot_standby_arg)
)
.subcommand(
Command::new("stop")

View File

@@ -11,15 +11,33 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
use crate::local_env::LocalEnv;
use crate::pageserver::PageServerNode;
use crate::postgresql_conf::PostgresConf;
use compute_api::spec::ComputeMode;
// contents of a endpoint.json file
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
name: String,
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
timeline_id: TimelineId,
mode: ComputeMode,
port: u16,
pg_version: u32,
}
//
// ComputeControlPlane
//
@@ -68,23 +86,34 @@ impl ComputeControlPlane {
tenant_id: TenantId,
name: &str,
timeline_id: TimelineId,
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
timeline_id,
lsn,
mode,
tenant_id,
pg_version,
});
ep.create_pgdata()?;
std::fs::write(
ep.endpoint_path().join("endpoint.json"),
serde_json::to_string_pretty(&EndpointConf {
name: name.to_string(),
tenant_id,
timeline_id,
mode,
port,
pg_version,
})?,
)?;
ep.setup_pg_conf()?;
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
@@ -101,8 +130,7 @@ pub struct Endpoint {
name: String,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub lsn: Option<Lsn>,
pub mode: ComputeMode,
// port and address of the Postgres server
pub address: SocketAddr,
@@ -131,42 +159,20 @@ impl Endpoint {
let fname = entry.file_name();
let name = fname.to_str().unwrap().to_string();
// Read config file into memory
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
let cfg_path_str = cfg_path.to_string_lossy();
let mut conf_file = File::open(&cfg_path)
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
let conf = PostgresConf::read(&mut conf_file)
.with_context(|| format!("failed to read config file in {}", cfg_path_str))?;
// Read a few options from the config file
let context = format!("in config file {}", cfg_path_str);
let port: u16 = conf.parse_field("port", &context)?;
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
// If it doesn't exist, assume broken data directory and use default pg version.
let pg_version_path = entry.path().join("PG_VERSION");
let pg_version_str =
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?;
// Read the endpoint.json file
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
// ok now
Ok(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
name,
env: env.clone(),
pageserver: Arc::clone(pageserver),
timeline_id,
lsn: recovery_target_lsn,
tenant_id,
pg_version,
timeline_id: conf.timeline_id,
mode: conf.mode,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
})
}
@@ -299,50 +305,83 @@ impl Endpoint {
conf.append("neon.pageserver_connstring", &pageserver_connstr);
conf.append("neon.tenant_id", &self.tenant_id.to_string());
conf.append("neon.timeline_id", &self.timeline_id.to_string());
if let Some(lsn) = self.lsn {
conf.append("recovery_target_lsn", &lsn.to_string());
}
conf.append_line("");
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
// updates pages are not requested from pageserver.
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
// To be able to restore database in case of pageserver node crash, safekeeper should not
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
// Replication-related configurations, such as WAL sending
match &self.mode {
ComputeMode::Primary => {
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
// updates pages are not requested from pageserver.
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
// To be able to restore database in case of pageserver node crash, safekeeper should not
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() {
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
if !self.env.safekeepers.is_empty() {
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
}
}
ComputeMode::Static(lsn) => {
conf.append("recovery_target_lsn", &lsn.to_string());
}
ComputeMode::Replica => {
assert!(!self.env.safekeepers.is_empty());
// TODO: use future host field from safekeeper spec
// Pass the list of safekeepers to the replica so that it can connect to any of them,
// whichever is availiable.
let sk_ports = self
.env
.safekeepers
.iter()
.map(|x| x.pg_port.to_string())
.collect::<Vec<_>>()
.join(",");
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
let connstr = format!(
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
sk_hosts,
sk_ports,
&self.timeline_id.to_string(),
&self.tenant_id.to_string(),
);
let slot_name = format!("repl_{}_", self.timeline_id);
conf.append("primary_conninfo", connstr.as_str());
conf.append("primary_slot_name", slot_name.as_str());
conf.append("hot_standby", "on");
}
}
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
@@ -355,21 +394,27 @@ impl Endpoint {
}
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = if let Some(lsn) = self.lsn {
Some(lsn)
} else if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
Some(lsn)
let backup_lsn = match &self.mode {
ComputeMode::Primary => {
if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
Some(lsn)
}
} else {
None
}
}
ComputeMode::Static(lsn) => Some(*lsn),
ComputeMode::Replica => {
None // Take the latest snapshot available to start with
}
} else {
None
};
self.do_basebackup(backup_lsn)?;
@@ -466,7 +511,7 @@ impl Endpoint {
// 3. Load basebackup
self.load_basebackup(auth_token)?;
if self.lsn.is_some() {
if self.mode != ComputeMode::Primary {
File::create(self.pgdata().join("standby.signal"))?;
}

View File

@@ -13,7 +13,7 @@ use std::io::BufRead;
use std::str::FromStr;
/// In-memory representation of a postgresql.conf file
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PostgresConf {
lines: Vec<String>,
hash: HashMap<String, String>,

View File

@@ -28,11 +28,6 @@
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "wal_log_hints",
"value": "on",

View File

@@ -11,4 +11,5 @@ serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
utils = { path = "../utils" }
workspace_hack.workspace = true

View File

@@ -19,7 +19,7 @@ pub struct ComputeStatusResponse {
pub timeline: Option<String>,
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
}
@@ -29,7 +29,7 @@ pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
}
@@ -54,11 +54,15 @@ pub enum ComputeStatus {
Failed,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
fn rfc3339_serialize<S>(x: &Option<DateTime<Utc>>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
if let Some(x) = x {
x.to_rfc3339().serialize(s)
} else {
s.serialize_none()
}
}
/// Response of the /metrics.json API

View File

@@ -3,8 +3,10 @@
//! The spec.json file is used to pass information to 'compute_ctl'. It contains
//! all the information needed to start up the right version of PostgreSQL,
//! and connect it to the storage nodes.
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use utils::lsn::Lsn;
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
@@ -12,6 +14,7 @@ pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[serde_as]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
@@ -24,11 +27,29 @@ pub struct ComputeSpec {
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
#[serde(default)]
pub mode: ComputeMode,
pub storage_auth_token: Option<String>,
pub startup_tracing_context: Option<HashMap<String, String>>,
}
#[serde_as]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeMode {
/// A read-write node
#[default]
Primary,
/// A read-only node, pinned at a particular LSN
Static(#[serde_as(as = "DisplayFromStr")] Lsn),
/// A read-only node that follows the tip of the branch in hot standby mode
///
/// Future versions may want to distinguish between replicas with hot standby
/// feedback and other kinds of replication configurations.
Replica,
}
#[derive(Clone, Debug, Default, Deserialize)]
pub struct Cluster {
pub cluster_id: String,

View File

@@ -50,11 +50,14 @@ impl QueryError {
}
}
/// Returns true if the given error is a normal consequence of a network issue,
/// or the client closing the connection. These errors can happen during normal
/// operations, and don't indicate a bug in our code.
pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(
e.kind(),
ConnectionRefused | ConnectionAborted | ConnectionReset | TimedOut
BrokenPipe | ConnectionRefused | ConnectionAborted | ConnectionReset | TimedOut
)
}

View File

@@ -95,10 +95,13 @@ pub fn generate_wal_segment(
segno: u64,
system_id: u64,
pg_version: u32,
lsn: Lsn,
) -> Result<Bytes, SerializeError> {
assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
match pg_version {
14 => v14::xlog_utils::generate_wal_segment(segno, system_id),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id),
14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn),
_ => Err(SerializeError::BadInput),
}
}

View File

@@ -195,6 +195,7 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_LONG_HEADER: u16 = 0x0002;
/* From fsm_internals.h */

View File

@@ -270,6 +270,11 @@ impl XLogPageHeaderData {
use utils::bin_ser::LeSer;
XLogPageHeaderData::des_from(&mut buf.reader())
}
pub fn encode(&self) -> Result<Bytes, SerializeError> {
use utils::bin_ser::LeSer;
self.ser().map(|b| b.into())
}
}
impl XLogLongPageHeaderData {
@@ -328,22 +333,32 @@ impl CheckPoint {
}
}
//
// Generate new, empty WAL segment.
// We need this segment to start compute node.
//
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
/// Generate new, empty WAL segment, with correct block headers at the first
/// page of the segment and the page that contains the given LSN.
/// We need this segment to start compute node.
pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
let page_off = lsn.block_offset();
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
let first_page_only = seg_off < XLOG_BLCKSZ;
let (shdr_rem_len, infoflags) = if first_page_only {
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
} else {
(0, 0)
};
let hdr = XLogLongPageHeaderData {
std: {
XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER,
xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
xlp_tli: PG_TLI,
xlp_pageaddr: pageaddr,
xlp_rem_len: 0,
xlp_rem_len: shdr_rem_len as u32,
..Default::default() // Put 0 in padding fields.
}
},
@@ -357,6 +372,33 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, Seriali
//zero out the rest of the file
seg_buf.resize(WAL_SEGMENT_SIZE, 0);
if !first_page_only {
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
let header = XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
pg_constants::XLP_FIRST_IS_CONTRECORD
} else {
0
},
xlp_tli: PG_TLI,
xlp_pageaddr: lsn.page_lsn().0,
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
page_off as u32
} else {
0u32
},
..Default::default() // Put 0 in padding fields.
};
let hdr_bytes = header.encode()?;
debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
debug_assert_ne!(block_offset, 0);
seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
}
Ok(seg_buf.freeze())
}

View File

@@ -1,15 +1,13 @@
use anyhow::*;
use core::time::Duration;
use anyhow::{bail, ensure};
use log::*;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
use std::cmp::Ordering;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Instant;
use std::process::Command;
use std::time::{Duration, Instant};
use tempfile::{tempdir, TempDir};
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -56,7 +54,7 @@ impl Conf {
self.datadir.join("pg_wal")
}
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
let path = self.pg_bin_dir()?.join(command);
ensure!(path.exists(), "Command {:?} does not exist", path);
let mut cmd = Command::new(path);
@@ -66,7 +64,7 @@ impl Conf {
Ok(cmd)
}
pub fn initdb(&self) -> Result<()> {
pub fn initdb(&self) -> anyhow::Result<()> {
if let Some(parent) = self.datadir.parent() {
info!("Pre-creating parent directory {:?}", parent);
// Tests may be run concurrently and there may be a race to create `test_output/`.
@@ -80,7 +78,7 @@ impl Conf {
let output = self
.new_pg_command("initdb")?
.arg("-D")
.arg(self.datadir.as_os_str())
.arg(&self.datadir)
.args(["-U", "postgres", "--no-instructions", "--no-sync"])
.output()?;
debug!("initdb output: {:?}", output);
@@ -93,26 +91,18 @@ impl Conf {
Ok(())
}
pub fn start_server(&self) -> Result<PostgresServer> {
pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
info!("Starting Postgres server in {:?}", self.datadir);
let log_file = fs::File::create(self.datadir.join("pg.log")).with_context(|| {
format!(
"Failed to create pg.log file in directory {}",
self.datadir.display()
)
})?;
let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
let unix_socket_dir_path = unix_socket_dir.path().to_owned();
let server_process = self
.new_pg_command("postgres")?
.args(["-c", "listen_addresses="])
.arg("-k")
.arg(unix_socket_dir_path.as_os_str())
.arg(&unix_socket_dir_path)
.arg("-D")
.arg(self.datadir.as_os_str())
.args(["-c", "logging_collector=on"]) // stderr will mess up with tests output
.arg(&self.datadir)
.args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
.stderr(Stdio::from(log_file))
.spawn()?;
let server = PostgresServer {
process: server_process,
@@ -121,7 +111,7 @@ impl Conf {
let mut c = postgres::Config::new();
c.host_path(&unix_socket_dir_path);
c.user("postgres");
c.connect_timeout(Duration::from_millis(1000));
c.connect_timeout(Duration::from_millis(10000));
c
},
};
@@ -132,7 +122,7 @@ impl Conf {
&self,
first_segment_name: &str,
last_segment_name: &str,
) -> Result<std::process::Output> {
) -> anyhow::Result<std::process::Output> {
let first_segment_file = self.datadir.join(first_segment_name);
let last_segment_file = self.datadir.join(last_segment_name);
info!(
@@ -142,10 +132,7 @@ impl Conf {
);
let output = self
.new_pg_command("pg_waldump")?
.args([
&first_segment_file.as_os_str(),
&last_segment_file.as_os_str(),
])
.args([&first_segment_file, &last_segment_file])
.output()?;
debug!("waldump output: {:?}", output);
Ok(output)
@@ -153,10 +140,9 @@ impl Conf {
}
impl PostgresServer {
pub fn connect_with_timeout(&self) -> Result<Client> {
pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
while Instant::now() < retry_until {
use std::result::Result::Ok;
if let Ok(client) = self.client_config.connect(postgres::NoTls) {
return Ok(client);
}
@@ -173,7 +159,6 @@ impl PostgresServer {
impl Drop for PostgresServer {
fn drop(&mut self) {
use std::result::Result::Ok;
match self.process.try_wait() {
Ok(Some(_)) => return,
Ok(None) => {
@@ -188,12 +173,12 @@ impl Drop for PostgresServer {
}
pub trait PostgresClientExt: postgres::GenericClient {
fn pg_current_wal_insert_lsn(&mut self) -> Result<PgLsn> {
fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
Ok(self
.query_one("SELECT pg_current_wal_insert_lsn()", &[])?
.get(0))
}
fn pg_current_wal_flush_lsn(&mut self) -> Result<PgLsn> {
fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
Ok(self
.query_one("SELECT pg_current_wal_flush_lsn()", &[])?
.get(0))
@@ -202,7 +187,7 @@ pub trait PostgresClientExt: postgres::GenericClient {
impl<C: postgres::GenericClient> PostgresClientExt for C {}
pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result<()> {
pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
client.execute("create extension if not exists neon_test_utils", &[])?;
let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
@@ -236,13 +221,13 @@ pub trait Crafter {
/// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
/// May include or exclude Lsn(0) and the end-of-wal.
/// * The expected end-of-wal LSN.
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)>;
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
}
fn craft_internal<C: postgres::GenericClient>(
client: &mut C,
f: impl Fn(&mut C, PgLsn) -> Result<(Vec<PgLsn>, Option<PgLsn>)>,
) -> Result<(Vec<PgLsn>, PgLsn)> {
f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
ensure_server_config(client)?;
let initial_lsn = client.pg_current_wal_insert_lsn()?;
@@ -274,7 +259,7 @@ fn craft_internal<C: postgres::GenericClient>(
pub struct Simple;
impl Crafter for Simple {
const NAME: &'static str = "simple";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_internal(client, |client, _| {
client.execute("CREATE table t(x int)", &[])?;
Ok((Vec::new(), None))
@@ -285,7 +270,7 @@ impl Crafter for Simple {
pub struct LastWalRecordXlogSwitch;
impl Crafter for LastWalRecordXlogSwitch {
const NAME: &'static str = "last_wal_record_xlog_switch";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
// Do not use generate_internal because here we end up with flush_lsn exactly on
// the segment boundary and insert_lsn after the initial page header, which is unusual.
ensure_server_config(client)?;
@@ -307,7 +292,7 @@ impl Crafter for LastWalRecordXlogSwitch {
pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
// Do not use generate_internal because here we end up with flush_lsn exactly on
// the segment boundary and insert_lsn after the initial page header, which is unusual.
ensure_server_config(client)?;
@@ -374,7 +359,7 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
fn craft_single_logical_message(
client: &mut impl postgres::GenericClient,
transactional: bool,
) -> Result<(Vec<PgLsn>, PgLsn)> {
) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_internal(client, |client, initial_lsn| {
ensure!(
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
@@ -416,7 +401,7 @@ fn craft_single_logical_message(
pub struct WalRecordCrossingSegmentFollowedBySmallOne;
impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_single_logical_message(client, true)
}
}
@@ -424,7 +409,7 @@ impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
pub struct LastWalRecordCrossingSegment;
impl Crafter for LastWalRecordCrossingSegment {
const NAME: &'static str = "last_wal_record_crossing_segment";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_single_logical_message(client, false)
}
}

View File

@@ -10,7 +10,6 @@ byteorder.workspace = true
pin-project-lite.workspace = true
postgres-protocol.workspace = true
rand.workspace = true
serde.workspace = true
tokio.workspace = true
tracing.workspace = true
thiserror.workspace = true

View File

@@ -6,15 +6,10 @@ pub mod framed;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
use std::{
borrow::Cow,
collections::HashMap,
fmt, io, str,
time::{Duration, SystemTime},
};
use tracing::{trace, warn};
use std::{borrow::Cow, collections::HashMap, fmt, io, str};
// re-export for use in utils pageserver_feedback.rs
pub use postgres_protocol::PG_EPOCH;
pub type Oid = u32;
pub type SystemId = u64;
@@ -618,7 +613,7 @@ pub struct XLogDataBody<'a> {
#[derive(Debug)]
pub struct WalSndKeepAlive {
pub sent_ptr: u64,
pub wal_end: u64, // current end of WAL on the server
pub timestamp: i64,
pub request_reply: bool,
}
@@ -664,7 +659,7 @@ fn write_cstr(s: impl AsRef<[u8]>, buf: &mut BytesMut) -> Result<(), ProtocolErr
}
/// Read cstring from buf, advancing it.
fn read_cstr(buf: &mut Bytes) -> Result<Bytes, ProtocolError> {
pub fn read_cstr(buf: &mut Bytes) -> Result<Bytes, ProtocolError> {
let pos = buf
.iter()
.position(|x| *x == 0)
@@ -929,7 +924,7 @@ impl<'a> BeMessage<'a> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'k');
buf.put_u64(req.sent_ptr);
buf.put_u64(req.wal_end);
buf.put_i64(req.timestamp);
buf.put_u8(u8::from(req.request_reply));
});
@@ -939,175 +934,10 @@ impl<'a> BeMessage<'a> {
}
}
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct PageserverFeedback {
/// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
/// LSN last received and ingested by the pageserver.
pub last_received_lsn: u64,
/// LSN up to which data is persisted by the pageserver to its local disc.
pub disk_consistent_lsn: u64,
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
/// consider WAL before it can be removed.
pub remote_consistent_lsn: u64,
pub replytime: SystemTime,
}
// NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl PageserverFeedback {
pub fn empty() -> PageserverFeedback {
PageserverFeedback {
current_timeline_size: 0,
last_received_lsn: 0,
remote_consistent_lsn: 0,
disk_consistent_lsn: 0,
replytime: SystemTime::now(),
}
}
// Serialize PageserverFeedback using custom format
// to support protocol extensibility.
//
// Following layout is used:
// char - number of key-value pairs that follow.
//
// key-value pairs:
// null-terminated string - key,
// uint32 - value length in bytes
// value itself
//
// TODO: change serialized fields names once all computes migrate to rename.
pub fn serialize(&self, buf: &mut BytesMut) {
buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
buf.put_slice(b"current_timeline_size\0");
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
buf.put_slice(b"ps_writelsn\0");
buf.put_i32(8);
buf.put_u64(self.last_received_lsn);
buf.put_slice(b"ps_flushlsn\0");
buf.put_i32(8);
buf.put_u64(self.disk_consistent_lsn);
buf.put_slice(b"ps_applylsn\0");
buf.put_i32(8);
buf.put_u64(self.remote_consistent_lsn);
let timestamp = self
.replytime
.duration_since(*PG_EPOCH)
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
buf.put_slice(b"ps_replytime\0");
buf.put_i32(8);
buf.put_i64(timestamp);
}
// Deserialize PageserverFeedback message
// TODO: change serialized fields names once all computes migrate to rename.
pub fn parse(mut buf: Bytes) -> PageserverFeedback {
let mut rf = PageserverFeedback::empty();
let nfields = buf.get_u8();
for _ in 0..nfields {
let key = read_cstr(&mut buf).unwrap();
match key.as_ref() {
b"current_timeline_size" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.current_timeline_size = buf.get_u64();
}
b"ps_writelsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.last_received_lsn = buf.get_u64();
}
b"ps_flushlsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.disk_consistent_lsn = buf.get_u64();
}
b"ps_applylsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.remote_consistent_lsn = buf.get_u64();
}
b"ps_replytime" => {
let len = buf.get_i32();
assert_eq!(len, 8);
let raw_time = buf.get_i64();
if raw_time > 0 {
rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
} else {
rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
}
}
_ => {
let len = buf.get_i32();
warn!(
"PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
String::from_utf8_lossy(key.as_ref())
);
buf.advance(len as usize);
}
}
}
trace!("PageserverFeedback parsed is {:?}", rf);
rf
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replication_feedback_serialization() {
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
#[test]
fn test_replication_feedback_unknown_key() {
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
}
data.put_slice(b"new_field_one\0");
data.put_i32(8);
data.put_u64(42);
// Parse serialized data and check that new field is not parsed
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
#[test]
fn test_startup_message_params_options_escaped() {
fn split_options(params: &StartupMessageParams) -> Vec<Cow<'_, str>> {

View File

@@ -99,7 +99,11 @@ struct S3WithTestBlobs {
#[async_trait::async_trait]
impl AsyncTestContext for MaybeEnabledS3 {
async fn setup() -> Self {
utils::logging::init(utils::logging::LogFormat::Test).expect("logging init failed");
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
)
.expect("logging init failed");
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
info!(
"`{}` env variable is not set, skipping the test",

View File

@@ -11,6 +11,7 @@ async-trait.workspace = true
anyhow.workspace = true
bincode.workspace = true
bytes.workspace = true
chrono.workspace = true
heapless.workspace = true
hex = { workspace = true, features = ["serde"] }
hyper = { workspace = true, features = ["full"] }
@@ -27,7 +28,8 @@ signal-hook.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
rand.workspace = true
serde_with.workspace = true
strum.workspace = true
@@ -35,6 +37,7 @@ strum_macros.workspace = true
url.workspace = true
uuid.workspace = true
pq_proto.workspace = true
metrics.workspace = true
workspace_hack.workspace = true

View File

@@ -131,7 +131,9 @@ impl RequestCancelled {
impl Drop for RequestCancelled {
fn drop(&mut self) {
if let Some(span) = self.warn.take() {
if std::thread::panicking() {
// we are unwinding due to panicking, assume we are not dropped for cancellation
} else if let Some(span) = self.warn.take() {
// the span has all of the info already, but the outer `.instrument(span)` has already
// been dropped, so we need to manually re-enter it for this message.
//

View File

@@ -1,9 +1,7 @@
use std::fmt::Display;
use anyhow::Context;
use bytes::Buf;
use hyper::{header, Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize, Serializer};
use serde::{Deserialize, Serialize};
use super::error::ApiError;
@@ -33,12 +31,3 @@ pub fn json_response<T: Serialize>(
.map_err(|e| ApiError::InternalServerError(e.into()))?;
Ok(response)
}
/// Serialize through Display trait.
pub fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
F: Display,
{
s.serialize_str(&format!("{}", z))
}

View File

@@ -265,6 +265,26 @@ impl fmt::Display for TenantTimelineId {
}
}
impl FromStr for TenantTimelineId {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split('/');
let tenant_id = parts
.next()
.ok_or_else(|| anyhow::anyhow!("TenantTimelineId must contain tenant_id"))?
.parse()?;
let timeline_id = parts
.next()
.ok_or_else(|| anyhow::anyhow!("TenantTimelineId must contain timeline_id"))?
.parse()?;
if parts.next().is_some() {
anyhow::bail!("TenantTimelineId must contain only tenant_id and timeline_id");
}
Ok(TenantTimelineId::new(tenant_id, timeline_id))
}
}
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
// by the console.
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)]

View File

@@ -54,6 +54,12 @@ pub mod measured_stream;
pub mod serde_percent;
pub mod serde_regex;
pub mod pageserver_feedback;
pub mod tracing_span_assert;
pub mod rate_limit;
/// use with fail::cfg("$name", "return(2000)")
#[macro_export]
macro_rules! failpoint_sleep_millis_async {

View File

@@ -56,7 +56,20 @@ where
}
}
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
/// Whether to add the `tracing_error` crate's `ErrorLayer`
/// to the global tracing subscriber.
///
pub enum TracingErrorLayerEnablement {
/// Do not add the `ErrorLayer`.
Disabled,
/// Add the `ErrorLayer` with the filter specified by RUST_LOG, defaulting to `info` if `RUST_LOG` is unset.
EnableWithRustLogFilter,
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
) -> anyhow::Result<()> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -67,21 +80,26 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
})
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
.init();
let r = tracing_subscriber::registry();
let r = r.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init(),
TracingErrorLayerEnablement::Disabled => r.init(),
}
Ok(())
}

View File

@@ -62,29 +62,48 @@ impl Lsn {
}
/// Compute the offset into a segment
#[inline]
pub fn segment_offset(self, seg_sz: usize) -> usize {
(self.0 % seg_sz as u64) as usize
}
/// Compute LSN of the segment start.
#[inline]
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
Lsn(self.0 - (self.0 % seg_sz as u64))
}
/// Compute the segment number
#[inline]
pub fn segment_number(self, seg_sz: usize) -> u64 {
self.0 / seg_sz as u64
}
/// Compute the offset into a block
#[inline]
pub fn block_offset(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
self.0 % BLCKSZ
}
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_lsn(self) -> Lsn {
Lsn(self.0 - self.block_offset())
}
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
(self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
}
/// Compute the bytes remaining in this block
///
/// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
#[inline]
pub fn remaining_in_block(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
BLCKSZ - (self.0 % BLCKSZ)

View File

@@ -0,0 +1,214 @@
use std::time::{Duration, SystemTime};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tracing::{trace, warn};
use crate::lsn::Lsn;
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.
///
/// serde Serialize is used only for human readable dump to json (e.g. in
/// safekeepers debug_dump).
#[serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct PageserverFeedback {
/// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
/// LSN last received and ingested by the pageserver. Controls backpressure.
#[serde_as(as = "DisplayFromStr")]
pub last_received_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver to its local disc.
/// Controls backpressure.
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
/// consider WAL before it can be removed.
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
// Serialize with RFC3339 format.
#[serde(with = "serde_systemtime")]
pub replytime: SystemTime,
}
// NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl PageserverFeedback {
pub fn empty() -> PageserverFeedback {
PageserverFeedback {
current_timeline_size: 0,
last_received_lsn: Lsn::INVALID,
remote_consistent_lsn: Lsn::INVALID,
disk_consistent_lsn: Lsn::INVALID,
replytime: *PG_EPOCH,
}
}
// Serialize PageserverFeedback using custom format
// to support protocol extensibility.
//
// Following layout is used:
// char - number of key-value pairs that follow.
//
// key-value pairs:
// null-terminated string - key,
// uint32 - value length in bytes
// value itself
//
// TODO: change serialized fields names once all computes migrate to rename.
pub fn serialize(&self, buf: &mut BytesMut) {
buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
buf.put_slice(b"current_timeline_size\0");
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
buf.put_slice(b"ps_writelsn\0");
buf.put_i32(8);
buf.put_u64(self.last_received_lsn.0);
buf.put_slice(b"ps_flushlsn\0");
buf.put_i32(8);
buf.put_u64(self.disk_consistent_lsn.0);
buf.put_slice(b"ps_applylsn\0");
buf.put_i32(8);
buf.put_u64(self.remote_consistent_lsn.0);
let timestamp = self
.replytime
.duration_since(*PG_EPOCH)
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
buf.put_slice(b"ps_replytime\0");
buf.put_i32(8);
buf.put_i64(timestamp);
}
// Deserialize PageserverFeedback message
// TODO: change serialized fields names once all computes migrate to rename.
pub fn parse(mut buf: Bytes) -> PageserverFeedback {
let mut rf = PageserverFeedback::empty();
let nfields = buf.get_u8();
for _ in 0..nfields {
let key = read_cstr(&mut buf).unwrap();
match key.as_ref() {
b"current_timeline_size" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.current_timeline_size = buf.get_u64();
}
b"ps_writelsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.last_received_lsn = Lsn(buf.get_u64());
}
b"ps_flushlsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.disk_consistent_lsn = Lsn(buf.get_u64());
}
b"ps_applylsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.remote_consistent_lsn = Lsn(buf.get_u64());
}
b"ps_replytime" => {
let len = buf.get_i32();
assert_eq!(len, 8);
let raw_time = buf.get_i64();
if raw_time > 0 {
rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
} else {
rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
}
}
_ => {
let len = buf.get_i32();
warn!(
"PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
String::from_utf8_lossy(key.as_ref())
);
buf.advance(len as usize);
}
}
}
trace!("PageserverFeedback parsed is {:?}", rf);
rf
}
}
mod serde_systemtime {
use std::time::SystemTime;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let chrono_dt: DateTime<Utc> = (*ts).into();
serializer.serialize_str(&chrono_dt.to_rfc3339())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
{
let time: String = Deserialize::deserialize(deserializer)?;
Ok(DateTime::parse_from_rfc3339(&time)
.map_err(serde::de::Error::custom)?
.into())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replication_feedback_serialization() {
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
#[test]
fn test_replication_feedback_unknown_key() {
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
}
data.put_slice(b"new_field_one\0");
data.put_i32(8);
data.put_u64(42);
// Parse serialized data and check that new field is not parsed
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
}

View File

@@ -0,0 +1,66 @@
//! A helper to rate limit operations.
use std::time::{Duration, Instant};
pub struct RateLimit {
last: Option<Instant>,
interval: Duration,
}
impl RateLimit {
pub fn new(interval: Duration) -> Self {
Self {
last: None,
interval,
}
}
/// Call `f` if the rate limit allows.
/// Don't call it otherwise.
pub fn call<F: FnOnce()>(&mut self, f: F) {
let now = Instant::now();
match self.last {
Some(last) if now - last <= self.interval => {
// ratelimit
}
_ => {
self.last = Some(now);
f();
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
#[test]
fn basics() {
use super::RateLimit;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
let called = AtomicUsize::new(0);
let mut f = RateLimit::new(Duration::from_millis(100));
let cl = || {
called.fetch_add(1, Relaxed);
};
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 3);
}
}

View File

@@ -0,0 +1,287 @@
//! Assert that the current [`tracing::Span`] has a given set of fields.
//!
//! # Usage
//!
//! ```
//! use tracing_subscriber::prelude::*;
//! let registry = tracing_subscriber::registry()
//! .with(tracing_error::ErrorLayer::default());
//!
//! // Register the registry as the global subscriber.
//! // In this example, we'll only use it as a thread-local subscriber.
//! let _guard = tracing::subscriber::set_default(registry);
//!
//! // Then, in the main code:
//!
//! let span = tracing::info_span!("TestSpan", test_id = 1);
//! let _guard = span.enter();
//!
//! // ... down the call stack
//!
//! use utils::tracing_span_assert::{check_fields_present, MultiNameExtractor};
//! let extractor = MultiNameExtractor::new("TestExtractor", ["test", "test_id"]);
//! match check_fields_present([&extractor]) {
//! Ok(()) => {},
//! Err(missing) => {
//! panic!("Missing fields: {:?}", missing.into_iter().map(|f| f.name() ).collect::<Vec<_>>());
//! }
//! }
//! ```
//!
//! Recommended reading: https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
//!
use std::{
collections::HashSet,
fmt::{self},
hash::{Hash, Hasher},
};
pub enum ExtractionResult {
Present,
Absent,
}
pub trait Extractor: Send + Sync + std::fmt::Debug {
fn name(&self) -> &str;
fn extract(&self, fields: &tracing::field::FieldSet) -> ExtractionResult;
}
#[derive(Debug)]
pub struct MultiNameExtractor<const L: usize> {
name: &'static str,
field_names: [&'static str; L],
}
impl<const L: usize> MultiNameExtractor<L> {
pub fn new(name: &'static str, field_names: [&'static str; L]) -> MultiNameExtractor<L> {
MultiNameExtractor { name, field_names }
}
}
impl<const L: usize> Extractor for MultiNameExtractor<L> {
fn name(&self) -> &str {
self.name
}
fn extract(&self, fields: &tracing::field::FieldSet) -> ExtractionResult {
if fields.iter().any(|f| self.field_names.contains(&f.name())) {
ExtractionResult::Present
} else {
ExtractionResult::Absent
}
}
}
struct MemoryIdentity<'a>(&'a dyn Extractor);
impl<'a> MemoryIdentity<'a> {
fn as_ptr(&self) -> *const () {
self.0 as *const _ as *const ()
}
}
impl<'a> PartialEq for MemoryIdentity<'a> {
fn eq(&self, other: &Self) -> bool {
self.as_ptr() == other.as_ptr()
}
}
impl<'a> Eq for MemoryIdentity<'a> {}
impl<'a> Hash for MemoryIdentity<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_ptr().hash(state);
}
}
impl<'a> fmt::Debug for MemoryIdentity<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:p}: {}", self.as_ptr(), self.0.name())
}
}
/// The extractor names passed as keys to [`new`].
pub fn check_fields_present<const L: usize>(
must_be_present: [&dyn Extractor; L],
) -> Result<(), Vec<&dyn Extractor>> {
let mut missing: HashSet<MemoryIdentity> =
HashSet::from_iter(must_be_present.into_iter().map(|r| MemoryIdentity(r)));
let trace = tracing_error::SpanTrace::capture();
trace.with_spans(|md, _formatted_fields| {
missing.retain(|extractor| match extractor.0.extract(md.fields()) {
ExtractionResult::Present => false,
ExtractionResult::Absent => true,
});
!missing.is_empty() // continue walking up until we've found all missing
});
if missing.is_empty() {
Ok(())
} else {
Err(missing.into_iter().map(|mi| mi.0).collect())
}
}
#[cfg(test)]
mod tests {
use tracing_subscriber::prelude::*;
use super::*;
struct Setup {
_current_thread_subscriber_guard: tracing::subscriber::DefaultGuard,
tenant_extractor: MultiNameExtractor<2>,
timeline_extractor: MultiNameExtractor<2>,
}
fn setup_current_thread() -> Setup {
let tenant_extractor = MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"]);
let timeline_extractor = MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"]);
let registry = tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(tracing_error::ErrorLayer::default());
let guard = tracing::subscriber::set_default(registry);
Setup {
_current_thread_subscriber_guard: guard,
tenant_extractor,
timeline_extractor,
}
}
fn assert_missing(missing: Vec<&dyn Extractor>, expected: Vec<&dyn Extractor>) {
let missing: HashSet<MemoryIdentity> =
HashSet::from_iter(missing.into_iter().map(MemoryIdentity));
let expected: HashSet<MemoryIdentity> =
HashSet::from_iter(expected.into_iter().map(MemoryIdentity));
assert_eq!(missing, expected);
}
#[test]
fn positive_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", tenant_id = "tenant-1", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap();
}
#[test]
fn negative_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", timeline_id = "timeline-1");
let _guard = span.enter();
let missing =
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn positive_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", tenant_id = "tenant-1");
let _guard = span.enter();
let span = tracing::info_span!("grandchild", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap();
}
#[test]
fn negative_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn positive_subset_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", tenant_id = "tenant-1", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor]).unwrap();
}
#[test]
fn positive_subset_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", tenant_id = "tenant-1");
let _guard = span.enter();
let span = tracing::info_span!("grandchild", timeline_id = "timeline-1");
let _guard = span.enter();
check_fields_present([&setup.tenant_extractor]).unwrap();
}
#[test]
fn negative_subset_one_level() {
let setup = setup_current_thread();
let span = tracing::info_span!("root", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn negative_subset_multiple_levels() {
let setup = setup_current_thread();
let span = tracing::info_span!("root");
let _guard = span.enter();
let span = tracing::info_span!("child", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn tracing_error_subscriber_not_set_up() {
// no setup
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let extractor = MultiNameExtractor::new("E", ["e"]);
let missing = check_fields_present([&extractor]).unwrap_err();
assert_missing(missing, vec![&extractor]);
}
#[test]
#[should_panic]
fn panics_if_tracing_error_subscriber_has_wrong_filter() {
let r = tracing_subscriber::registry().with({
tracing_error::ErrorLayer::default().with_filter(
tracing_subscriber::filter::dynamic_filter_fn(|md, _| {
if md.is_span() && *md.level() == tracing::Level::INFO {
return false;
}
true
}),
)
});
let _guard = tracing::subscriber::set_default(r);
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let extractor = MultiNameExtractor::new("E", ["e"]);
let missing = check_fields_present([&extractor]).unwrap_err();
assert_missing(missing, vec![&extractor]);
}
}

View File

@@ -52,6 +52,7 @@ sync_wrapper.workspace = true
tokio-tar.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }

View File

@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(Arc::new(layer)).unwrap();
updates.insert_historic(Arc::new(layer));
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
is_incremental: false,
short_id: format!("Layer {}", i),
};
updates.insert_historic(Arc::new(layer)).unwrap();
updates.insert_historic(Arc::new(layer));
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -463,9 +463,13 @@ where
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg =
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
let wal_seg = postgres_ffi::generate_wal_segment(
segno,
system_identifier,
self.timeline.pg_version,
self.lsn,
)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..]).await?;
Ok(())

View File

@@ -25,6 +25,7 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::logging::TracingErrorLayerEnablement;
use utils::signals::ShutdownSignals;
use utils::{
auth::JwtAuth, logging, project_git_version, sentry_init::init_sentry, signals::Signal,
@@ -86,8 +87,19 @@ fn main() -> anyhow::Result<()> {
}
};
// Initialize logging, which must be initialized before the custom panic hook is installed.
logging::init(conf.log_format)?;
// Initialize logging.
//
// It must be initialized before the custom panic hook is installed below.
//
// Regarding tracing_error enablement: at this time, we only use the
// tracing_error crate to debug_assert that log spans contain tenant and timeline ids.
// See `debug_assert_current_span_has_tenant_and_timeline_id` in the timeline module
let tracing_error_layer_enablement = if cfg!(debug_assertions) {
TracingErrorLayerEnablement::EnableWithRustLogFilter
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(conf.log_format, tracing_error_layer_enablement)?;
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
@@ -226,6 +238,7 @@ fn start_pageserver(
);
set_build_info_metric(GIT_VERSION);
set_launch_timestamp_metric(launch_ts);
pageserver::preinitialize_metrics();
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes

View File

@@ -340,7 +340,29 @@ paths:
format: hex
post:
description: Schedules attach operation to happen in the background for given tenant
description: |
Schedules attach operation to happen in the background for the given tenant.
As soon as the caller sends this request, it must assume the pageserver
starts writing to the tenant's S3 state unless it receives one of the
distinguished errors below that state otherwise.
The method to identify whether a request has arrived at the pageserver, and
whether it has succeeded, is to poll for the tenant status to reach "Active"
or "Broken" state. These values are currently not explicitly documented in
the API spec.
Polling for `has_in_progress_downloads == false` is INCORRECT because that
value can turn `false` during shutdown while the Attach operation is still
unfinished.
There is no way to cancel an in-flight request.
If a client receives a not-distinguished response, e.g., a network timeout,
it MUST retry the /attach request and poll again for tenant status.
In any case, it must
* NOT ASSUME that the /attach request has been lost in the network,
* NOT ASSUME that the request has been lost based on a subsequent
tenant status request returning 404. (The request may still be in flight!)
responses:
"202":
description: Tenant attaching scheduled

View File

@@ -691,16 +691,6 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
Ok(response)
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
//
// let value = result.with_context(bad_duration("name", &value))?;
//
fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn() -> String {
move || format!("Cannot parse `{field_name}` duration {value:?}")
}
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
@@ -708,91 +698,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(&gc_period)
.with_context(bad_duration("gc_period", &gc_period))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(&pitr_interval)
.with_context(bad_duration("pitr_interval", &pitr_interval))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(&walreceiver_connect_timeout)
.with_context(bad_duration(
"walreceiver_connect_timeout",
&walreceiver_connect_timeout,
))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(&lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", &lagging_wal_timeout))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(&checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", &checkpoint_timeout))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(&compaction_period)
.with_context(bad_duration("compaction_period", &compaction_period))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(eviction_policy) = request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde_json::from_value(eviction_policy)
.context("parse field `eviction_policy`")
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let tenant_conf = TenantConfOpt::try_from(&request_data).map_err(ApiError::BadRequest)?;
let target_tenant_id = request_data
.new_tenant_id
@@ -860,85 +766,7 @@ async fn update_tenant_config_handler(
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(&gc_period)
.with_context(bad_duration("gc_period", &gc_period))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(&pitr_interval)
.with_context(bad_duration("pitr_interval", &pitr_interval))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(&walreceiver_connect_timeout)
.with_context(bad_duration(
"walreceiver_connect_timeout",
&walreceiver_connect_timeout,
))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(&lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", &lagging_wal_timeout))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
tenant_conf.trace_read_requests = request_data.trace_read_requests;
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(&checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", &checkpoint_timeout))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(&compaction_period)
.with_context(bad_duration("compaction_period", &compaction_period))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(eviction_policy) = request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde_json::from_value(eviction_policy)
.context("parse field `eviction_policy`")
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let tenant_conf = TenantConfOpt::try_from(&request_data).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
@@ -1201,6 +1029,7 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
)
}
#[cfg(feature = "testing")]
async fn post_tracing_event_handler(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "lowercase")]

View File

@@ -114,7 +114,7 @@ async fn import_rel(
path: &Path,
spcoid: Oid,
dboid: Oid,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
reader: &mut (impl AsyncRead + Unpin),
len: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -200,7 +200,7 @@ async fn import_slru(
modification: &mut DatadirModification<'_>,
slru: SlruKind,
path: &Path,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
reader: &mut (impl AsyncRead + Unpin),
len: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -612,8 +612,8 @@ async fn import_file(
Ok(None)
}
async fn read_all_bytes(reader: &mut (impl AsyncRead + Send + Sync + Unpin)) -> Result<Bytes> {
async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf).await?;
Ok(Bytes::copy_from_slice(&buf[..]))
Ok(Bytes::from(buf))
}

View File

@@ -44,6 +44,8 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -205,6 +205,15 @@ static EVICTIONS_WITH_LOW_RESIDENCE_DURATION: Lazy<IntCounterVec> = Lazy::new(||
.expect("failed to define a metric")
});
pub static UNEXPECTED_ONDEMAND_DOWNLOADS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_unexpected_ondemand_downloads_count",
"Number of unexpected on-demand downloads. \
We log more context for each increment, so, forgo any labels in this metric.",
)
.expect("failed to define a metric")
});
/// Each [`Timeline`]'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric.
#[derive(Debug)]
pub struct EvictionsWithLowResidenceDuration {
@@ -278,14 +287,33 @@ impl EvictionsWithLowResidenceDuration {
let Some(_counter) = self.counter.take() else {
return;
};
EVICTIONS_WITH_LOW_RESIDENCE_DURATION
.remove_label_values(&[
tenant_id,
timeline_id,
self.data_source,
&Self::threshold_label_value(self.threshold),
])
.expect("we own the metric, no-one else should remove it");
let threshold = Self::threshold_label_value(self.threshold);
let removed = EVICTIONS_WITH_LOW_RESIDENCE_DURATION.remove_label_values(&[
tenant_id,
timeline_id,
self.data_source,
&threshold,
]);
match removed {
Err(e) => {
// this has been hit in staging as
// <https://neondatabase.sentry.io/issues/4142396994/>, but we don't know how.
// because we can be in the drop path already, don't risk:
// - "double-panic => illegal instruction" or
// - future "drop panick => abort"
//
// so just nag: (the error has the labels)
tracing::warn!("failed to remove EvictionsWithLowResidenceDuration, it was already removed? {e:#?}");
}
Ok(()) => {
// to help identify cases where we double-remove the same values, let's log all
// deletions?
tracing::info!("removed EvictionsWithLowResidenceDuration with {tenant_id}, {timeline_id}, {}, {threshold}", self.data_source);
}
}
}
}
@@ -350,11 +378,6 @@ pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub static NUM_ONDISK_LAYERS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
.expect("failed to define a metric")
});
// remote storage metrics
/// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`].
@@ -455,6 +478,56 @@ pub static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("Failed to register tenant_task_events metric")
});
// walreceiver metrics
pub static WALRECEIVER_STARTED_CONNECTIONS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_walreceiver_started_connections_total",
"Number of started walreceiver connections"
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_ACTIVE_MANAGERS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_walreceiver_active_managers",
"Number of active walreceiver managers"
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_SWITCHES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_walreceiver_switches_total",
"Number of walreceiver manager change_connection calls",
&["reason"]
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_BROKER_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_walreceiver_broker_updates_total",
"Number of received broker updates in walreceiver"
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_CANDIDATES_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_walreceiver_candidates_events_total",
"Number of walreceiver candidate events",
&["event"]
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["add"]));
pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
// Metrics collected on WAL redo operations
//
// We collect the time spent in actual WAL redo ('redo'), and time waiting
@@ -1137,3 +1210,10 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
poll_result
}
}
pub fn preinitialize_metrics() {
// We want to alert on this metric increasing.
// Initialize it eagerly, so that our alert rule can distinguish absence of the metric from metric value 0.
assert_eq!(UNEXPECTED_ONDEMAND_DOWNLOADS.get(), 0);
UNEXPECTED_ONDEMAND_DOWNLOADS.reset();
}

View File

@@ -20,7 +20,6 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use postgres_backend::PostgresBackendTCP;
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
@@ -32,6 +31,7 @@ use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tracing::*;
use utils::id::ConnectionId;
@@ -57,7 +57,10 @@ use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<Bytes>> + '_ {
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
where
IO: AsyncRead + AsyncWrite + Unpin,
{
async_stream::try_stream! {
loop {
let msg = tokio::select! {
@@ -65,8 +68,8 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down".to_string();
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
@@ -125,7 +128,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
///
/// XXX: Currently, any trailing data after the EOF marker prints a warning.
/// Perhaps it should be a hard error?
async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow::Result<()> {
async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; 512];
@@ -245,12 +248,23 @@ async fn page_service_conn_main(
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let peer_addr = socket.peer_addr().context("get peer address")?;
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
// - long enough for most valid compute connections
// - less than infinite to stop us from "leaking" connections to long-gone computes
//
// no write timeout is used, because the kernel is assumed to error writes after some time.
let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
socket.set_timeout(Some(std::time::Duration::from_secs(60 * 10)));
let socket = std::pin::pin!(socket);
// XXX: pgbackend.run() should take the connection_ctx,
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
let pgbackend = PostgresBackend::new(socket, auth_type, None)?;
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
.run(&mut conn_handler, task_mgr::shutdown_watcher)
@@ -332,13 +346,16 @@ impl PageServerHandler {
}
#[instrument(skip(self, pgb, ctx))]
async fn handle_pagerequests(
async fn handle_pagerequests<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
@@ -381,7 +398,9 @@ impl PageServerHandler {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
anyhow::bail!("unexpected message: {m:?} during COPY");
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break, // client disconnected
};
@@ -436,16 +455,19 @@ impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
async fn handle_import_basebackup(
async fn handle_import_basebackup<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
pg_version: u32,
ctx: RequestContext,
) -> Result<(), QueryError> {
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
@@ -486,15 +508,18 @@ impl PageServerHandler {
}
#[instrument(skip(self, pgb, ctx))]
async fn handle_import_wal(
async fn handle_import_wal<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
ctx: RequestContext,
) -> Result<(), QueryError> {
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
@@ -690,16 +715,19 @@ impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
async fn handle_basebackup_request(
async fn handle_basebackup_request<IO>(
&mut self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
ctx: RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<()>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let started = std::time::Instant::now();
// check that the timeline exists
@@ -770,10 +798,13 @@ impl PageServerHandler {
}
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
fn check_auth_jwt(
&mut self,
_pgb: &mut PostgresBackendTCP,
_pgb: &mut PostgresBackend<IO>,
jwt_response: &[u8],
) -> Result<(), QueryError> {
// this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
@@ -801,7 +832,7 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
fn startup(
&mut self,
_pgb: &mut PostgresBackendTCP,
_pgb: &mut PostgresBackend<IO>,
_sm: &FeStartupPacket,
) -> Result<(), QueryError> {
Ok(())
@@ -809,7 +840,7 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
async fn process_query(
&mut self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError> {
let ctx = self.connection_ctx.attached_child();

View File

@@ -118,6 +118,10 @@ pub struct Tenant {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
/// The value creation timestamp, used to measure activation delay, see:
/// <https://github.com/neondatabase/neon/issues/4025>
loading_started_at: Instant,
state: watch::Sender<TenantState>,
// Overridden tenant-specific config parameters.
@@ -1476,7 +1480,7 @@ impl Tenant {
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
info!("Activating tenant {}", self.tenant_id);
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
@@ -1487,12 +1491,17 @@ impl Tenant {
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self.tenant_id);
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
for timeline in not_broken_timelines {
match timeline
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {}
Ok(()) => {
activated_timelines += 1;
}
Err(e) => {
error!(
"Failed to activate timeline {}: {:#}",
@@ -1503,9 +1512,26 @@ impl Tenant {
"failed to activate timeline {}: {}",
timeline.timeline_id, e
));
timelines_broken_during_activation += 1;
}
}
}
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
timelines_broken_during_activation,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
}
}
});
@@ -1812,6 +1838,9 @@ impl Tenant {
Tenant {
tenant_id,
conf,
// using now here is good enough approximation to catch tenants with really long
// activation times.
loading_started_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),
gc_cs: tokio::sync::Mutex::new(()),
@@ -2857,7 +2886,13 @@ pub mod harness {
};
LOG_HANDLE.get_or_init(|| {
logging::init(logging::LogFormat::Test).expect("Failed to init test logging")
logging::init(
logging::LogFormat::Test,
// enable it in case in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
)
.expect("Failed to init test logging")
});
let repo_dir = PageServerConf::test_repo_dir(test_name);

View File

@@ -8,6 +8,8 @@
//! We cannot use global or default config instead, because wrong settings
//! may lead to a data loss.
//!
use anyhow::Context;
use pageserver_api::models::{TenantConfigRequest, TenantCreateRequest};
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::time::Duration;
@@ -280,6 +282,179 @@ impl Default for TenantConf {
}
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
//
// let value = result.with_context(bad_duration("name", &value))?;
//
fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn() -> String {
move || format!("Cannot parse `{field_name}` duration {value:?}")
}
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = &request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = &request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
evictions_low_residence_duration_metric_threshold,
))?,
);
}
Ok(tenant_conf)
}
}
impl TryFrom<&'_ TenantConfigRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantConfigRequest) -> Result<Self, Self::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = &request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
tenant_conf.trace_read_requests = request_data.trace_read_requests;
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = &request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
evictions_low_residence_duration_metric_threshold,
))?,
);
}
Ok(tenant_conf)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -48,11 +48,10 @@ mod layer_coverage;
use crate::context::RequestContext;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::{bail, Result};
use anyhow::Result;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
@@ -126,7 +125,7 @@ where
///
/// Insert an on-disk layer.
///
pub fn insert_historic(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
pub fn insert_historic(&mut self, layer: Arc<L>) {
self.layer_map.insert_historic_noflush(layer)
}
@@ -274,22 +273,16 @@ where
///
/// Helper function for BatchedUpdates::insert_historic
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
let key = historic_layer_coverage::LayerKey::from(&*layer);
if self.historic.contains(&key) {
bail!(
"Attempt to insert duplicate layer {} in layer map",
layer.short_id()
);
}
self.historic.insert(key, Arc::clone(&layer));
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
Arc::clone(&layer),
);
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
}
NUM_ONDISK_LAYERS.inc();
Ok(())
}
///
@@ -314,8 +307,6 @@ where
"failed to locate removed historic layer from l0_delta_layers"
);
}
NUM_ONDISK_LAYERS.dec();
}
pub(self) fn replace_historic_noflush(
@@ -843,7 +834,7 @@ mod tests {
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update().insert_historic(remote.clone()).unwrap();
map.batch_update().insert_historic(remote.clone());
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map

View File

@@ -417,14 +417,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
}
pub fn contains(&self, layer_key: &LayerKey) -> bool {
match self.buffer.get(layer_key) {
Some(None) => false, // layer remove was buffered
Some(_) => true, // layer insert was buffered
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
}
}
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
self.buffer.insert(layer_key, Some(value));
}

View File

@@ -16,6 +16,7 @@ use tracing::{info, warn};
use crate::config::PageServerConf;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;
@@ -43,6 +44,8 @@ pub async fn download_layer_file<'a>(
layer_file_name: &'a LayerFileName,
layer_metadata: &'a LayerFileMetadata,
) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
let local_path = timeline_path.join(layer_file_name.file_name());
@@ -154,7 +157,7 @@ pub async fn download_layer_file<'a>(
.with_context(|| format!("Could not fsync layer file {}", local_path.display(),))
.map_err(DownloadError::Other)?;
tracing::info!("download complete: {}", local_path.display());
tracing::debug!("download complete: {}", local_path.display());
Ok(bytes_amount)
}

View File

@@ -13,9 +13,9 @@ use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use either::Either;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
@@ -23,8 +23,10 @@ use pageserver_api::models::{
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter;
use utils::rate_limit::RateLimit;
use utils::{
id::{TenantId, TimelineId},
@@ -37,6 +39,8 @@ pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
@@ -158,41 +162,82 @@ impl LayerAccessStatFullDetails {
}
impl LayerAccessStats {
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
new
/// Create an empty stats object.
///
/// The caller is responsible for recording a residence event
/// using [`record_residence_event`] before calling `latest_activity`.
/// If they don't, [`latest_activity`] will return `None`.
pub(crate) fn empty_will_record_residence_event_later() -> Self {
LayerAccessStats(Mutex::default())
}
pub(crate) fn for_new_layer_file() -> Self {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer<L>(
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
) -> Self
where
L: ?Sized + Layer,
{
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
layer_map_lock_held_witness,
status,
LayerResidenceEventReason::LayerLoad,
);
new
}
/// Creates a clone of `self` and records `new_status` in the clone.
/// The `new_status` is not recorded in `self`
pub(crate) fn clone_for_residence_change(
///
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change<L>(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
) -> LayerAccessStats
where
L: ?Sized + Layer,
{
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
};
let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
new.record_residence_event(
layer_map_lock_held_witness,
new_status,
LayerResidenceEventReason::ResidenceChange,
);
new
}
fn record_residence_event(
/// Record a change in layer residency.
///
/// Recording the event must happen while holding the layer map lock to
/// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
/// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
///
/// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
/// the following race could happen:
///
/// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
/// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
///
pub(crate) fn record_residence_event<L>(
&self,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {
) where
L: ?Sized + Layer,
{
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner
@@ -255,24 +300,37 @@ impl LayerAccessStats {
ret
}
fn most_recent_access_or_residence_event(
&self,
) -> Either<LayerAccessStatFullDetails, LayerResidenceEvent> {
/// Get the latest access timestamp, falling back to latest residence event.
///
/// This function can only return `None` if there has not yet been a call to the
/// [`record_residence_event`] method. That would generally be considered an
/// implementation error. This function logs a rate-limited warning in that case.
///
/// TODO: use type system to avoid the need for `fallback`.
/// The approach in https://github.com/neondatabase/neon/pull/3775
/// could be used to enforce that a residence event is recorded
/// before a layer is added to the layer map. We could also have
/// a layer wrapper type that holds the LayerAccessStats, and ensure
/// that that type can only be produced by inserting into the layer map.
pub(crate) fn latest_activity(&self) -> Option<SystemTime> {
let locked = self.0.lock().unwrap();
let inner = &locked.for_eviction_policy;
match inner.last_accesses.recent() {
Some(a) => Either::Left(*a),
Some(a) => Some(a.when),
None => match inner.last_residence_changes.recent() {
Some(e) => Either::Right(e.clone()),
None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"),
}
}
}
pub(crate) fn latest_activity(&self) -> SystemTime {
match self.most_recent_access_or_residence_event() {
Either::Left(mra) => mra.when,
Either::Right(re) => re.timestamp,
Some(e) => Some(e.timestamp),
None => {
static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
guard.0 += 1;
let occurences = guard.0;
guard.1.call(move || {
warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
});
None
}
},
}
}
}

View File

@@ -57,7 +57,7 @@ use utils::{
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, LayerResidenceStatus, PathOrConf,
LayerKeyIter, PathOrConf,
};
///
@@ -637,7 +637,7 @@ impl DeltaLayer {
key_range: summary.key_range,
lsn_range: summary.lsn_range,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
@@ -808,7 +808,7 @@ impl DeltaLayerWriterInner {
key_range: self.key_start..key_end,
lsn_range: self.lsn_range.clone(),
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,

View File

@@ -53,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -438,7 +438,7 @@ impl ImageLayer {
key_range: summary.key_range,
lsn: summary.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(ImageLayerInner {
file: None,
loaded: false,
@@ -598,7 +598,7 @@ impl ImageLayerWriterInner {
key_range: self.key_range.clone(),
lsn: self.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(ImageLayerInner {
loaded: false,
file: None,

View File

@@ -4,6 +4,7 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result};
@@ -246,11 +247,15 @@ impl RemoteLayer {
}
/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer(
pub fn create_downloaded_layer<L>(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer> {
) -> Arc<dyn PersistentLayer>
where
L: ?Sized + Layer,
{
if self.is_delta {
let fname = DeltaFileName {
key_range: self.key_range.clone(),
@@ -262,8 +267,10 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
self.access_stats.clone_for_residence_change(
layer_map_lock_held_witness,
LayerResidenceStatus::Resident,
),
))
} else {
let fname = ImageFileName {
@@ -276,8 +283,10 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
self.access_stats.clone_for_residence_change(
layer_map_lock_held_witness,
LayerResidenceStatus::Resident,
),
))
}
}

View File

@@ -11,7 +11,8 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
TimelineState,
};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
@@ -48,7 +49,7 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::TimelineMetrics;
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
@@ -588,15 +589,25 @@ impl Timeline {
let _timer = self.metrics.wait_lsn_time_histo.start_timer();
self.last_record_lsn.wait_for_timeout(lsn, self.conf.wait_lsn_timeout).await
.with_context(||
format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}",
lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn()
)
)?;
Ok(())
match self
.last_record_lsn
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.await
{
Ok(()) => Ok(()),
seqwait_error => {
drop(_timer);
let walreceiver_status = self.walreceiver.status().await;
seqwait_error.with_context(|| format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, {}",
lsn,
self.get_last_record_lsn(),
self.get_disk_consistent_lsn(),
walreceiver_status.map(|status| status.to_human_readable_string())
.unwrap_or_else(|| "WalReceiver status: Not active".to_string()),
))
}
}
}
/// Check that it is valid to request operations with that lsn.
@@ -936,6 +947,7 @@ impl Timeline {
}
}
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
@@ -1100,7 +1112,7 @@ impl Timeline {
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::Evicted),
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
@@ -1109,7 +1121,7 @@ impl Timeline {
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::Evicted),
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
});
@@ -1478,12 +1490,12 @@ impl Timeline {
self.tenant_id,
&imgfilename,
file_size,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer))?;
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1510,12 +1522,12 @@ impl Timeline {
self.tenant_id,
&deltafilename,
file_size,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer))?;
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1589,7 +1601,7 @@ impl Timeline {
// remote index file?
// If so, rename_to_backup those files & replace their local layer with
// a RemoteLayer in the layer map so that we re-download them on-demand.
if let Some(local_layer) = &local_layer {
if let Some(local_layer) = local_layer {
let local_layer_path = local_layer
.local_path()
.expect("caller must ensure that local_layers only contains local layers");
@@ -1614,6 +1626,7 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer);
// fall-through to adding the remote layer
}
} else {
@@ -1645,15 +1658,14 @@ impl Timeline {
self.timeline_id,
imgfilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
LayerAccessStats::for_loading_layer(
&updates,
LayerResidenceStatus::Evicted,
),
);
let remote_layer = Arc::new(remote_layer);
if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
updates.insert_historic(remote_layer);
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1674,14 +1686,13 @@ impl Timeline {
self.timeline_id,
deltafilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
LayerAccessStats::for_loading_layer(
&updates,
LayerResidenceStatus::Evicted,
),
);
let remote_layer = Arc::new(remote_layer);
if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
updates.insert_historic(remote_layer);
}
}
}
@@ -2355,6 +2366,7 @@ impl Timeline {
id,
ctx.task_kind()
);
UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
timeline.download_remote_layer(remote_layer).await?;
continue 'layer_map_search;
}
@@ -2724,11 +2736,16 @@ impl Timeline {
])?;
// Add it to the layer map
self.layers
.write()
.unwrap()
.batch_update()
.insert_historic(Arc::new(new_delta))?;
let l = Arc::new(new_delta);
let mut layers = self.layers.write().unwrap();
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(l);
batch_updates.flush();
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
@@ -2933,7 +2950,13 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
updates.insert_historic(Arc::new(l))?;
let l = Arc::new(l);
l.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(l);
}
updates.flush();
drop(layers);
@@ -3366,7 +3389,12 @@ impl Timeline {
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
updates.insert_historic(x)?;
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -3818,11 +3846,13 @@ impl Timeline {
/// If the caller has a deadline or needs a timeout, they can simply stop polling:
/// we're **cancellation-safe** because the download happens in a separate task_mgr task.
/// So, the current download attempt will run to completion even if we stop polling.
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%remote_layer.short_id()))]
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
use std::sync::atomic::Ordering::Relaxed;
let permit = match Arc::clone(&remote_layer.ongoing_download)
@@ -3866,15 +3896,17 @@ impl Timeline {
.await;
if let Ok(size) = &result {
info!("layer file download finished");
// XXX the temp file is still around in Err() case
// and consumes space until we clean up upon pageserver restart.
self_clone.metrics.resident_physical_size_gauge.add(*size);
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size);
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
@@ -3937,6 +3969,8 @@ impl Timeline {
updates.flush();
drop(layers);
info!("on-demand download successful");
// Now that we've inserted the download into the layer map,
// close the semaphore. This will make other waiters for
// this download return Ok(()).
@@ -3944,7 +3978,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!("on-demand download failed: {:?}", result.as_ref().unwrap_err());
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download
@@ -4144,7 +4178,15 @@ impl Timeline {
continue;
}
let last_activity_ts = l.access_stats().latest_activity();
let last_activity_ts = l
.access_stats()
.latest_activity()
.unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%l.filename().file_name(), "last_activity returns None, using SystemTime::now");
SystemTime::now()
});
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
layer: l,
@@ -4255,3 +4297,36 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
bail!("couldn't find an unused backup number for {:?}", path)
}
#[cfg(not(debug_assertions))]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {}
#[cfg(debug_assertions)]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
use utils::tracing_span_assert;
pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"])
});
pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
tracing_span_assert::MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"])
});
match tracing_span_assert::check_fields_present([
&*TENANT_ID_EXTRACTOR,
&*TIMELINE_ID_EXTRACTOR,
]) {
Ok(()) => (),
Err(missing) => panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
),
}
}

View File

@@ -184,7 +184,14 @@ impl Timeline {
if hist_layer.is_remote_layer() {
continue;
}
let last_activity_ts = hist_layer.access_stats().latest_activity();
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%hist_layer.filename().file_name(), "last_activity returns None, using SystemTime::now");
SystemTime::now()
});
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,
Err(_e) => {

View File

@@ -38,12 +38,14 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::select;
use tokio::sync::watch;
use tokio::sync::{watch, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantTimelineId;
use self::connection_manager::ConnectionManagerStatus;
use super::Timeline;
#[derive(Clone)]
@@ -63,6 +65,7 @@ pub struct WalReceiver {
timeline_ref: Weak<Timeline>,
conf: WalReceiverConf,
started: AtomicBool,
manager_status: Arc<RwLock<Option<ConnectionManagerStatus>>>,
}
impl WalReceiver {
@@ -76,6 +79,7 @@ impl WalReceiver {
timeline_ref,
conf,
started: AtomicBool::new(false),
manager_status: Arc::new(RwLock::new(None)),
}
}
@@ -96,8 +100,8 @@ impl WalReceiver {
let timeline_id = timeline.timeline_id;
let walreceiver_ctx =
ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
let wal_receiver_conf = self.conf.clone();
let loop_status = Arc::clone(&self.manager_status);
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
@@ -115,24 +119,28 @@ impl WalReceiver {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
break;
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut connection_manager_state,
&walreceiver_ctx,
&loop_status,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
break;
}
},
}
}
}.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
connection_manager_state.shutdown().await;
*loop_status.write().await = None;
Ok(())
}
.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
);
self.started.store(true, atomic::Ordering::Release);
@@ -149,6 +157,10 @@ impl WalReceiver {
.await;
self.started.store(false, atomic::Ordering::Release);
}
pub(super) async fn status(&self) -> Option<ConnectionManagerStatus> {
self.manager_status.read().await.clone()
}
}
/// A handle of an asynchronous task.

View File

@@ -13,6 +13,10 @@ use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, ti
use super::{TaskStateUpdate, WalReceiverConf};
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
};
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use anyhow::Context;
@@ -24,6 +28,7 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::BrokerClientChannel;
use storage_broker::Streaming;
use tokio::sync::RwLock;
use tokio::{select, sync::watch};
use tracing::*;
@@ -43,6 +48,7 @@ pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
manager_status: &RwLock<Option<ConnectionManagerStatus>>,
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = connection_manager_state
.timeline
@@ -56,6 +62,11 @@ pub(super) async fn connection_manager_loop_step(
}
}
WALRECEIVER_ACTIVE_MANAGERS.inc();
scopeguard::defer! {
WALRECEIVER_ACTIVE_MANAGERS.dec();
}
let id = TenantTimelineId {
tenant_id: connection_manager_state.timeline.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
@@ -180,6 +191,7 @@ pub(super) async fn connection_manager_loop_step(
.change_connection(new_candidate, ctx)
.await
}
*manager_status.write().await = Some(connection_manager_state.manager_status());
}
}
@@ -267,6 +279,78 @@ pub(super) struct ConnectionManagerState {
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
}
/// An information about connection manager's current connection and connection candidates.
#[derive(Debug, Clone)]
pub struct ConnectionManagerStatus {
existing_connection: Option<WalConnectionStatus>,
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
}
impl ConnectionManagerStatus {
/// Generates a string, describing current connection status in a form, suitable for logging.
pub fn to_human_readable_string(&self) -> String {
let mut resulting_string = "WalReceiver status".to_string();
match &self.existing_connection {
Some(connection) => {
if connection.has_processed_wal {
resulting_string.push_str(&format!(
" (update {}): streaming WAL from node {}, ",
connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
connection.node,
));
match (connection.streaming_lsn, connection.commit_lsn) {
(None, None) => resulting_string.push_str("no streaming data"),
(None, Some(commit_lsn)) => {
resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
}
(Some(streaming_lsn), None) => {
resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
}
(Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
&format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
),
}
} else if connection.is_connected {
resulting_string.push_str(&format!(
" (update {}): connecting to node {}",
connection
.latest_connection_update
.format("%Y-%m-%d %H:%M:%S"),
connection.node,
));
} else {
resulting_string.push_str(&format!(
" (update {}): initializing node {} connection",
connection
.latest_connection_update
.format("%Y-%m-%d %H:%M:%S"),
connection.node,
));
}
}
None => resulting_string.push_str(": disconnected"),
}
resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
let mut candidates = self.wal_stream_candidates.iter().peekable();
while let Some((node_id, candidate_info)) = candidates.next() {
resulting_string.push_str(&format!(
"({}|{}|{})",
node_id,
candidate_info.latest_update.format("%H:%M:%S"),
Lsn(candidate_info.timeline.commit_lsn)
));
if candidates.peek().is_some() {
resulting_string.push_str(", ");
}
}
resulting_string.push(']');
resulting_string
}
}
/// Current connection data.
#[derive(Debug)]
struct WalConnection {
@@ -293,14 +377,14 @@ struct NewCommittedWAL {
discovered_at: NaiveDateTime,
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
struct RetryInfo {
next_retry_at: Option<NaiveDateTime>,
retry_duration_seconds: f64,
}
/// Data about the timeline to connect to, received from the broker.
#[derive(Debug)]
#[derive(Debug, Clone)]
struct BrokerSkTimeline {
timeline: SafekeeperTimelineInfo,
/// Time at which the data was fetched from the broker last time, to track the stale data.
@@ -325,9 +409,14 @@ impl ConnectionManagerState {
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
WALRECEIVER_SWITCHES
.with_label_values(&[new_sk.reason.name()])
.inc();
self.drop_old_connection(true).await;
let id = self.id;
let node_id = new_sk.safekeeper_id;
let connect_timeout = self.conf.wal_connect_timeout;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
@@ -343,12 +432,13 @@ impl ConnectionManagerState {
cancellation,
connect_timeout,
ctx,
node_id,
)
.await
.context("walreceiver connection handling failure")
}
.instrument(
info_span!("walreceiver_connection", id = %id, node_id = %new_sk.safekeeper_id),
info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, %node_id),
)
});
@@ -364,6 +454,7 @@ impl ConnectionManagerState {
latest_wal_update: now,
streaming_lsn: None,
commit_lsn: None,
node: node_id,
},
connection_task: connection_handle,
discovered_new_wal: None,
@@ -437,6 +528,8 @@ impl ConnectionManagerState {
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
WALRECEIVER_BROKER_UPDATES.inc();
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
let old_entry = self.wal_stream_candidates.insert(
new_safekeeper_id,
@@ -448,6 +541,7 @@ impl ConnectionManagerState {
if old_entry.is_none() {
info!("New SK node was added: {new_safekeeper_id}");
WALRECEIVER_CANDIDATES_ADDED.inc();
}
}
@@ -716,6 +810,7 @@ impl ConnectionManagerState {
for node_id in node_ids_to_remove {
info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
self.wal_connection_retries.remove(&node_id);
WALRECEIVER_CANDIDATES_REMOVED.inc();
}
}
}
@@ -725,6 +820,13 @@ impl ConnectionManagerState {
wal_connection.connection_task.shutdown().await;
}
}
fn manager_status(&self) -> ConnectionManagerStatus {
ConnectionManagerStatus {
existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
wal_stream_candidates: self.wal_stream_candidates.clone(),
}
}
}
#[derive(Debug)]
@@ -732,8 +834,6 @@ struct NewWalConnectionCandidate {
safekeeper_id: NodeId,
wal_source_connconf: PgConnectionConfig,
availability_zone: Option<String>,
// This field is used in `derive(Debug)` only.
#[allow(dead_code)]
reason: ReconnectReason,
}
@@ -762,6 +862,18 @@ enum ReconnectReason {
},
}
impl ReconnectReason {
fn name(&self) -> &str {
match self {
ReconnectReason::NoExistingConnection => "NoExistingConnection",
ReconnectReason::LaggingWal { .. } => "LaggingWal",
ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
}
}
}
fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
@@ -867,6 +979,7 @@ mod tests {
latest_wal_update: now,
commit_lsn: Some(Lsn(current_lsn)),
streaming_lsn: Some(Lsn(current_lsn)),
node: NodeId(1),
};
state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
@@ -1035,6 +1148,7 @@ mod tests {
latest_wal_update: now,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
node: connected_sk_id,
};
state.wal_connection = Some(WalConnection {
@@ -1101,6 +1215,7 @@ mod tests {
latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
node: NodeId(1),
};
state.wal_connection = Some(WalConnection {
@@ -1164,6 +1279,7 @@ mod tests {
latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
node: NodeId(1),
};
state.wal_connection = Some(WalConnection {
@@ -1261,6 +1377,7 @@ mod tests {
latest_wal_update: now,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
node: connected_sk_id,
};
state.wal_connection = Some(WalConnection {

View File

@@ -24,8 +24,8 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use super::TaskStateUpdate;
use crate::context::RequestContext;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::{context::RequestContext, metrics::WALRECEIVER_STARTED_CONNECTIONS};
use crate::{
task_mgr,
task_mgr::TaskKind,
@@ -37,8 +37,8 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::PageserverFeedback;
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::NodeId, lsn::Lsn};
/// Status of the connection.
#[derive(Debug, Clone, Copy)]
@@ -56,6 +56,8 @@ pub(super) struct WalConnectionStatus {
pub streaming_lsn: Option<Lsn>,
/// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
pub commit_lsn: Option<Lsn>,
/// The node it is connected to
pub node: NodeId,
}
/// Open a connection to the given safekeeper and receive WAL, sending back progress
@@ -67,7 +69,10 @@ pub(super) async fn handle_walreceiver_connection(
cancellation: CancellationToken,
connect_timeout: Duration,
ctx: RequestContext,
node: NodeId,
) -> anyhow::Result<()> {
WALRECEIVER_STARTED_CONNECTIONS.inc();
// Connect to the database in replication mode.
info!("connecting to {wal_source_connconf:?}");
@@ -100,6 +105,7 @@ pub(super) async fn handle_walreceiver_connection(
latest_wal_update: Utc::now().naive_utc(),
streaming_lsn: None,
commit_lsn: None,
node,
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
@@ -122,7 +128,7 @@ pub(super) async fn handle_walreceiver_connection(
false,
async move {
select! {
connection_result = connection => match connection_result{
connection_result = connection => match connection_result {
Ok(()) => info!("Walreceiver db connection closed"),
Err(connection_error) => {
if let Err(e) = ignore_expected_errors(connection_error) {
@@ -319,12 +325,12 @@ pub(super) async fn handle_walreceiver_connection(
timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let last_received_lsn = u64::from(last_lsn);
let last_received_lsn = last_lsn;
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
let disk_consistent_lsn = u64::from(timeline.get_disk_consistent_lsn());
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let remote_consistent_lsn = u64::from(timeline_remote_consistent_lsn);
let remote_consistent_lsn = timeline_remote_consistent_lsn;
let ts = SystemTime::now();
// Update the status about what we just received. This is shown in the mgmt API.

View File

@@ -96,6 +96,8 @@ static shmem_request_hook_type prev_shmem_request_hook;
#endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
void FileCacheMonitorMain(Datum main_arg);
static void
lfc_shmem_startup(void)
{
@@ -370,6 +372,73 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
return found;
}
/*
* Evict a page (if present) from the local file cache
*/
void
lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry* entry;
bool found;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return;
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
if (!found)
{
/* nothing to do */
LWLockRelease(lfc_lock);
return;
}
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
/*
* If the chunk has no live entries, we can position the chunk to be
* recycled first.
*/
if (entry->bitmap[chunk_offs >> 5] == 0)
{
bool has_remaining_pages;
for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) {
if (entry->bitmap[i] != 0)
{
has_remaining_pages = true;
break;
}
}
/*
* Put the entry at the position that is first to be reclaimed when
* we have no cached pages remaining in the chunk
*/
if (!has_remaining_pages)
{
dlist_delete(&entry->lru_node);
dlist_push_head(&lfc_ctl->lru, &entry->lru_node);
}
}
/*
* Done: apart from empty chunks, we don't move chunks in the LRU when
* they're empty because eviction isn't usage.
*/
LWLockRelease(lfc_lock);
}
/*
* Try to read page from local cache.
* Returns true if page is found in local cache.
@@ -528,7 +597,6 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
LWLockRelease(lfc_lock);
}
/*
* Record structure holding the to be exposed cache data.
*/

View File

@@ -17,6 +17,8 @@
#include "pagestore_client.h"
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
@@ -57,6 +59,8 @@ int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
int readahead_buffer_size = 128;
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static void pageserver_flush(void);
static bool
@@ -467,6 +471,8 @@ pg_init_libpagestore(void)
smgr_hook = smgr_neon;
smgr_init_hook = smgr_init_neon;
dbsize_hook = neon_dbsize;
old_redo_read_buffer_filter = redo_read_buffer_filter;
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}
lfc_init();
}

View File

@@ -24,6 +24,7 @@
#include "neon.h"
#include "walproposer.h"
#include "pagestore_client.h"
PG_MODULE_MAGIC;
void _PG_init(void);

View File

@@ -11,6 +11,7 @@
#ifndef NEON_H
#define NEON_H
#include "access/xlogreader.h"
/* GUCs */
extern char *neon_auth_token;
@@ -20,4 +21,11 @@ extern char *neon_tenant;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.
*/
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
#endif /* NEON_H */

View File

@@ -207,6 +207,7 @@ extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum);
extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);

View File

@@ -189,6 +189,7 @@ typedef struct PrfHashEntry {
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
#include "neon.h"
/*
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
@@ -1209,6 +1210,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
if (ShutdownRequestPending)
return;
/* Don't log any pages if we're not allowed to do so. */
if (!XLogInsertAllowed())
return;
/*
* Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM
@@ -1375,8 +1379,18 @@ neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockN
if (RecoveryInProgress())
{
/*
* We don't know if WAL has been generated but not yet replayed, so
* we're conservative in our estimates about latest pages.
*/
*latest = false;
lsn = GetXLogReplayRecPtr(NULL);
/*
* Get the last written LSN of this page.
*/
lsn = GetLastWrittenLSN(rnode, forknum, blkno);
lsn = nm_adjust_lsn(lsn);
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
}
@@ -1559,6 +1573,15 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
/*
* Newly created relation is empty, remember that in the relsize cache.
*
* Note that in REDO, this is called to make sure the relation fork exists,
* but it does not truncate the relation. So, we can only update the
* relsize if it didn't exist before.
*
* Also, in redo, we must make sure to update the cached size of the
* relation, as that is the primary source of truth for REDO's
* file length considerations, and as file extension isn't (perfectly)
* logged, we need to take care of that before we hit file size checks.
*
* FIXME: This is currently not just an optimization, but required for
* correctness. Postgres can call smgrnblocks() on the newly-created
* relation. Currently, we don't call SetLastWrittenLSN() when a new
@@ -1566,7 +1589,14 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
* cache, we might call smgrnblocks() on the newly-created relation before
* the creation WAL record hass been received by the page server.
*/
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
if (isRedo)
{
update_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
get_cached_relsize(reln->smgr_rnode.node, forkNum,
&reln->smgr_cached_nblocks[forkNum]);
}
else
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1831,6 +1861,26 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
.blockNum = blkno,
};
/*
* The redo process does not lock pages that it needs to replay but are
* not in the shared buffers, so a concurrent process may request the
* page after redo has decided it won't redo that page and updated the
* LwLSN for that page.
* If we're in hot standby we need to take care that we don't return
* until after REDO has finished replaying up to that LwLSN, as the page
* should have been locked up to that point.
*
* See also the description on neon_redo_read_buffer_filter below.
*
* NOTE: It is possible that the WAL redo process will still do IO due to
* concurrent failed read IOs. Those IOs should never have a request_lsn
* that is as large as the WAL record we're currently replaying, if it
* weren't for the behaviour of the LwLsn cache that uses the highest
* value of the LwLsn cache when the entry is not found.
*/
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
XLogWaitForReplayOf(request_lsn);
/*
* Try to find prefetched page in the list of received pages.
*/
@@ -2584,3 +2634,143 @@ smgr_init_neon(void)
smgr_init_standard();
neon_init();
}
/*
* Return whether we can skip the redo for this block.
*
* The conditions for skipping the IO are:
*
* - The block is not in the shared buffers, and
* - The block is not in the local file cache
*
* ... because any subsequent read of the page requires us to read
* the new version of the page from the PageServer. We do not
* check the local file cache; we instead evict the page from LFC: it
* is cheaper than going through the FS calls to read the page, and
* limits the number of lock operations used in the REDO process.
*
* We have one exception to the rules for skipping IO: We always apply
* changes to shared catalogs' pages. Although this is mostly out of caution,
* catalog updates usually result in backends rebuilding their catalog snapshot,
* which means it's quite likely the modified page is going to be used soon.
*
* It is important to note that skipping WAL redo for a page also means
* the page isn't locked by the redo process, as there is no Buffer
* being returned, nor is there a buffer descriptor to lock.
* This means that any IO that wants to read this block needs to wait
* for the WAL REDO process to finish processing the WAL record before
* it allows the system to start reading the block, as releasing the
* block early could lead to phantom reads.
*
* For example, REDO for a WAL record that modifies 3 blocks could skip
* the first block, wait for a lock on the second, and then modify the
* third block. Without skipping, all blocks would be locked and phantom
* reads would not occur, but with skipping, a concurrent process could
* read block 1 with post-REDO contents and read block 3 with pre-REDO
* contents, where with REDO locking it would wait on block 1 and see
* block 3 with post-REDO contents only.
*/
bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr end_recptr = record->EndRecPtr;
XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
BufferTag tag;
uint32 hash;
LWLock *partitionLock;
Buffer buffer;
bool no_redo_needed;
BlockNumber relsize;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
elog(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno);
#endif
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers.
* See also this function's top comment.
*/
if (!OidIsValid(rnode.dbNode))
return false;
INIT_BUFFERTAG(tag, rnode, forknum, blkno);
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/* Try to find the relevant buffer */
buffer = BufTableLookup(&tag, hash);
no_redo_needed = buffer < 0;
/* we don't have the buffer in memory, update lwLsn past this record */
if (no_redo_needed)
{
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
lfc_evict(rnode, forknum, blkno);
}
else
{
SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno);
}
LWLockRelease(partitionLock);
/* Extend the relation if we know its size */
if (get_cached_relsize(rnode, forknum, &relsize))
{
if (relsize < blkno + 1)
update_cached_relsize(rnode, forknum, blkno + 1);
}
else
{
/*
* Size was not cached. We populate the cache now, with the size of the
* relation measured after this WAL record is applied.
*
* This length is later reused when we open the smgr to read the block,
* which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
},
.rnode = rnode,
.forknum = forknum,
};
response = page_server_request(&request);
Assert(response->tag == T_NeonNblocksResponse);
nbresponse = (NeonNblocksResponse *) response;
Assert(nbresponse->n_blocks > blkno);
set_cached_relsize(rnode, forknum, nbresponse->n_blocks);
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
}
return no_redo_needed;
}

View File

@@ -1964,18 +1964,26 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs)
{
if (safekeeper[i].appendResponse.hs.ts != 0)
{
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.xmin, hs->xmin))
HotStandbyFeedback *skhs = &safekeeper[i].appendResponse.hs;
if (FullTransactionIdIsNormal(skhs->xmin)
&& FullTransactionIdPrecedes(skhs->xmin, hs->xmin))
{
hs->xmin = safekeeper[i].appendResponse.hs.xmin;
hs->ts = safekeeper[i].appendResponse.hs.ts;
hs->xmin = skhs->xmin;
hs->ts = skhs->ts;
}
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.catalog_xmin, hs->catalog_xmin))
if (FullTransactionIdIsNormal(skhs->catalog_xmin)
&& FullTransactionIdPrecedes(skhs->catalog_xmin, hs->xmin))
{
hs->catalog_xmin = safekeeper[i].appendResponse.hs.catalog_xmin;
hs->ts = safekeeper[i].appendResponse.hs.ts;
hs->catalog_xmin = skhs->catalog_xmin;
hs->ts = skhs->ts;
}
}
}
if (hs->xmin.value == ~0)
hs->xmin = InvalidFullTransactionId;
if (hs->catalog_xmin.value == ~0)
hs->catalog_xmin = InvalidFullTransactionId;
}
/*

10
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.4.1 and should not be changed by hand.
# This file is automatically @generated by Poetry and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -968,14 +968,14 @@ testing = ["pre-commit"]
[[package]]
name = "flask"
version = "2.1.3"
version = "2.2.5"
description = "A simple framework for building complex web applications."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "Flask-2.1.3-py3-none-any.whl", hash = "sha256:9013281a7402ad527f8fd56375164f3aa021ecfaff89bfe3825346c24f87e04c"},
{file = "Flask-2.1.3.tar.gz", hash = "sha256:15972e5017df0575c3d6c090ba168b6db90259e620ac8d7ea813a396bad5b6cb"},
{file = "Flask-2.2.5-py3-none-any.whl", hash = "sha256:58107ed83443e86067e41eff4631b058178191a355886f8e479e347fa1285fdf"},
{file = "Flask-2.2.5.tar.gz", hash = "sha256:edee9b0a7ff26621bd5a8c10ff484ae28737a2410d99b0bb9a6850c7fb977aa0"},
]
[package.dependencies]
@@ -983,7 +983,7 @@ click = ">=8.0"
importlib-metadata = {version = ">=3.6.0", markers = "python_version < \"3.10\""}
itsdangerous = ">=2.0"
Jinja2 = ">=3.0"
Werkzeug = ">=2.0"
Werkzeug = ">=2.2.2"
[package.extras]
async = ["asgiref (>=3.2)"]

View File

@@ -62,6 +62,8 @@ utils.workspace = true
uuid.workspace = true
webpki-roots.workspace = true
x509-parser.workspace = true
native-tls.workspace = true
postgres-native-tls.workspace = true
workspace_hack.workspace = true
tokio-util.workspace = true

View File

@@ -9,6 +9,7 @@ use crate::{
use pq_proto::BeMessage as Be;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_postgres::config::SslMode;
use tracing::{info, info_span};
#[derive(Debug, Error)]
@@ -87,6 +88,16 @@ pub(super) async fn authenticate(
.dbname(&db_info.dbname)
.user(&db_info.user);
// Backwards compatibility. pg_sni_proxy uses "--" in domain names
// while direct connections do not. Once we migrate to pg_sni_proxy
// everywhere, we can remove this.
if db_info.host.contains("--") {
// we need TLS connection with SNI info to properly route it
config.ssl_mode(SslMode::Require);
} else {
config.ssl_mode(SslMode::Disable);
}
if let Some(password) = db_info.password {
config.password(password.as_ref());
}
@@ -96,6 +107,7 @@ pub(super) async fn authenticate(
value: NodeInfo {
config,
aux: db_info.aux.into(),
allow_self_signed_compute: false, // caller may override
},
})
}

View File

@@ -0,0 +1,250 @@
/// A stand-alone program that routes connections, e.g. from
/// `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`.
///
/// This allows connecting to pods/services running in the same Kubernetes cluster from
/// the outside. Similar to an ingress controller for HTTPS.
use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;
use anyhow::{anyhow, bail, ensure, Context};
use clap::{self, Arg};
use futures::TryFutureExt;
use proxy::console::messages::MetricsAuxInfo;
use proxy::stream::{PqStream, Stream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use utils::{project_git_version, sentry_init::init_sentry};
use tracing::{error, info, warn};
project_git_version!(GIT_VERSION);
fn cli() -> clap::Command {
clap::Command::new("Neon proxy/router")
.version(GIT_VERSION)
.arg(
Arg::new("listen")
.short('l')
.long("listen")
.help("listen for incoming client connections on ip:port")
.default_value("127.0.0.1:4432"),
)
.arg(
Arg::new("tls-key")
.short('k')
.long("tls-key")
.help("path to TLS key for client postgres connections")
.required(true),
)
.arg(
Arg::new("tls-cert")
.short('c')
.long("tls-cert")
.help("path to TLS cert for client postgres connections")
.required(true),
)
.arg(
Arg::new("dest")
.short('d')
.long("destination")
.help("append this domain zone to the SNI hostname to get the destination address")
.required(true),
)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
let args = cli().get_matches();
let destination: String = args.get_one::<String>("dest").unwrap().parse()?;
// Configure TLS
let tls_config: Arc<rustls::ServerConfig> = match (
args.get_one::<String>("tls-key"),
args.get_one::<String>("tls-cert"),
) {
(Some(key_path), Some(cert_path)) => {
let key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()
};
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.context(format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
))?
.into_iter()
.map(rustls::Certificate)
.collect()
};
rustls::ServerConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
.with_no_client_auth()
.with_single_cert(cert_chain, key)?
.into()
}
_ => bail!("tls-key and tls-cert must be specified"),
};
// Start listening for incoming client connections
let proxy_address: SocketAddr = args.get_one::<String>("listen").unwrap().parse()?;
info!("Starting sni router on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
let main = proxy::flatten_err(tokio::spawn(task_main(
Arc::new(destination),
tls_config,
proxy_listener,
cancellation_token.clone(),
)));
let signals_task = proxy::flatten_err(tokio::spawn(proxy::handle_signals(cancellation_token)));
tokio::select! {
res = main => { res?; },
res = signals_task => { res?; },
}
Ok(())
}
async fn task_main(
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let mut connections = tokio::task::JoinSet::new();
loop {
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
info!("accepted postgres client connection from {peer_addr}");
let session_id = uuid::Uuid::new_v4();
let tls_config = Arc::clone(&tls_config);
let dest_suffix = Arc::clone(&dest_suffix);
connections.spawn(
async move {
info!("spawned a task for {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(dest_suffix, tls_config, session_id, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
}),
);
}
_ = cancellation_token.cancelled() => {
drop(listener);
break;
}
}
}
// Drain connections
info!("waiting for all client connections to finish");
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
info!("all client connections have finished");
Ok(())
}
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
raw_stream: S,
tls_config: Arc<rustls::ServerConfig>,
) -> anyhow::Result<Stream<S>> {
let mut stream = PqStream::new(Stream::from_raw(raw_stream));
let msg = stream.read_startup_packet().await?;
info!("received {msg:?}");
use pq_proto::FeStartupPacket::*;
match msg {
SslRequest => {
stream
.write_message(&pq_proto::BeMessage::EncryptionResponse(true))
.await?;
// Upgrade raw stream into a secure TLS-backed stream.
// NOTE: We've consumed `tls`; this fact will be used later.
let (raw, read_buf) = stream.into_inner();
// TODO: Normally, client doesn't send any data before
// server says TLS handshake is ok and read_buf is empy.
// However, you could imagine pipelining of postgres
// SSLRequest + TLS ClientHello in one hunk similar to
// pipelining in our node js driver. We should probably
// support that by chaining read_buf with the stream.
if !read_buf.is_empty() {
bail!("data is sent before server replied with EncryptionResponse");
}
Ok(raw.upgrade(tls_config).await?)
}
_ => stream.throw_error_str(ERR_INSECURE_CONNECTION).await?,
}
}
#[tracing::instrument(fields(session_id = ?session_id), skip_all)]
async fn handle_client(
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
let tls_stream = ssl_handshake(stream, tls_config).await?;
// Cut off first part of the SNI domain
// We receive required destination details in the format of
// `{k8s_service_name}--{k8s_namespace}--{port}.non-sni-domain`
let sni = tls_stream.sni_hostname().ok_or(anyhow!("SNI missing"))?;
let dest: Vec<&str> = sni
.split_once('.')
.context("invalid SNI")?
.0
.splitn(3, "--")
.collect();
let port = dest[2].parse::<u16>().context("invalid port")?;
let destination = format!("{}.{}.{}:{}", dest[0], dest[1], dest_suffix, port);
info!("destination: {}", destination);
let client = tokio::net::TcpStream::connect(destination).await?;
let metrics_aux: MetricsAuxInfo = Default::default();
proxy::proxy::proxy_pass(tls_stream, client, &metrics_aux).await
}

View File

@@ -1,49 +1,23 @@
//! Postgres protocol proxy/router.
//!
//! This service listens psql port and can check auth via external service
//! (control plane API in our case) and can create new databases and accounts
//! in somewhat transparent manner (again via communication with control plane API).
use proxy::auth;
use proxy::console;
use proxy::http;
use proxy::metrics;
mod auth;
mod cache;
mod cancellation;
mod compute;
mod config;
mod console;
mod error;
mod http;
mod logging;
mod metrics;
mod parse;
mod proxy;
mod sasl;
mod scram;
mod stream;
mod url;
mod waiters;
use anyhow::{bail, Context};
use anyhow::bail;
use clap::{self, Arg};
use config::ProxyConfig;
use futures::FutureExt;
use std::{borrow::Cow, future::Future, net::SocketAddr};
use tokio::{net::TcpListener, task::JoinError};
use proxy::config::{self, ProxyConfig};
use std::{borrow::Cow, net::SocketAddr};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use tracing::info;
use tracing::warn;
use utils::{project_git_version, sentry_init::init_sentry};
project_git_version!(GIT_VERSION);
/// Flattens `Result<Result<T>>` into `Result<T>`.
async fn flatten_err(
f: impl Future<Output = Result<anyhow::Result<()>, JoinError>>,
) -> anyhow::Result<()> {
f.map(|r| r.context("join error").and_then(|x| x)).await
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = logging::init().await?;
let _logging_guard = proxy::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
@@ -69,7 +43,7 @@ async fn main() -> anyhow::Result<()> {
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
let mut client_tasks = vec![tokio::spawn(proxy::task_main(
let mut client_tasks = vec![tokio::spawn(proxy::proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
@@ -88,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
}
let mut tasks = vec![
tokio::spawn(handle_signals(cancellation_token)),
tokio::spawn(proxy::handle_signals(cancellation_token)),
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
];
@@ -97,8 +71,9 @@ async fn main() -> anyhow::Result<()> {
tasks.push(tokio::spawn(metrics::task_main(metrics_config)));
}
let tasks = futures::future::try_join_all(tasks.into_iter().map(flatten_err));
let client_tasks = futures::future::try_join_all(client_tasks.into_iter().map(flatten_err));
let tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err));
let client_tasks =
futures::future::try_join_all(client_tasks.into_iter().map(proxy::flatten_err));
tokio::select! {
// We are only expecting an error from these forever tasks
res = tasks => { res?; },
@@ -107,33 +82,6 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
/// Handle unix signals appropriately.
async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut hangup = signal(SignalKind::hangup())?;
let mut interrupt = signal(SignalKind::interrupt())?;
let mut terminate = signal(SignalKind::terminate())?;
loop {
tokio::select! {
// Hangup is commonly used for config reload.
_ = hangup.recv() => {
warn!("received SIGHUP; config reload is not supported");
}
// Shut down the whole application.
_ = interrupt.recv() => {
warn!("received SIGINT, exiting immediately");
bail!("interrupted");
}
_ = terminate.recv() => {
warn!("received SIGTERM, shutting down once all existing connections have closed");
token.cancel();
}
}
}
}
/// ProxyConfig is created at proxy startup, and lives forever.
fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> {
let tls_config = match (
@@ -149,6 +97,14 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
let allow_self_signed_compute: bool = args
.get_one::<String>("allow-self-signed-compute")
.unwrap()
.parse()?;
if allow_self_signed_compute {
warn!("allowing self-signed compute certificates");
}
let metric_collection = match (
args.get_one::<String>("metric-collection-endpoint"),
args.get_one::<String>("metric-collection-interval"),
@@ -198,6 +154,7 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute,
}));
Ok(config)
@@ -288,6 +245,12 @@ fn cli() -> clap::Command {
.help("cache for `wake_compute` api method (use `size=0` to disable)")
.default_value(config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO),
)
.arg(
Arg::new("allow-self-signed-compute")
.long("allow-self-signed-compute")
.help("Allow self-signed certificates for compute nodes (for testing)")
.default_value("false"),
)
}
#[cfg(test)]

View File

@@ -1,11 +1,11 @@
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::TryFutureExt;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr};
use std::{io, net::SocketAddr, time::Duration};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
use tokio_postgres::tls::MakeTlsConnect;
use tracing::{error, info, warn};
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
@@ -19,6 +19,9 @@ pub enum ConnectionError {
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
TlsError(#[from] native_tls::Error),
}
impl UserFacingError for ConnectionError {
@@ -125,14 +128,34 @@ impl std::ops::DerefMut for ConnCfg {
}
}
impl Default for ConnCfg {
fn default() -> Self {
Self::new()
}
}
impl ConnCfg {
/// Establish a raw TCP connection to the compute node.
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> {
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream, &str)> {
use tokio_postgres::config::Host;
// wrap TcpStream::connect with timeout
let connect_with_timeout = |host, port| {
let connection_timeout = Duration::from_millis(10000);
tokio::time::timeout(connection_timeout, TcpStream::connect((host, port))).map(
move |res| match res {
Ok(tcpstream_connect_res) => tcpstream_connect_res,
Err(_) => Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("exceeded connection timeout {connection_timeout:?}"),
)),
},
)
};
let connect_once = |host, port| {
info!("trying to connect to compute node at {host}:{port}");
TcpStream::connect((host, port)).and_then(|socket| async {
connect_with_timeout(host, port).and_then(|socket| async {
let socket_addr = socket.peer_addr()?;
// This prevents load balancer from severing the connection.
socket2::SockRef::from(&socket).set_keepalive(true)?;
@@ -165,9 +188,8 @@ impl ConnCfg {
Host::Unix(_) => continue, // unix sockets are not welcome here
};
// TODO: maybe we should add a timeout.
match connect_once(host, *port).await {
Ok(socket) => return Ok(socket),
Ok((sockaddr, stream)) => return Ok((sockaddr, stream, host)),
Err(err) => {
// We can't throw an error here, as there might be more hosts to try.
warn!("couldn't connect to compute node at {host}:{port}: {err}");
@@ -187,7 +209,10 @@ impl ConnCfg {
pub struct PostgresConnection {
/// Socket connected to a compute node.
pub stream: TcpStream,
pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
tokio::net::TcpStream,
postgres_native_tls::TlsStream<tokio::net::TcpStream>,
>,
/// PostgreSQL connection parameters.
pub params: std::collections::HashMap<String, String>,
/// Query cancellation token.
@@ -195,11 +220,27 @@ pub struct PostgresConnection {
}
impl ConnCfg {
async fn do_connect(&self) -> Result<PostgresConnection, ConnectionError> {
// TODO: establish a secure connection to the DB.
let (socket_addr, mut stream) = self.connect_raw().await?;
let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?;
info!("connected to compute node at {socket_addr}");
async fn do_connect(
&self,
allow_self_signed_compute: bool,
) -> Result<PostgresConnection, ConnectionError> {
let (socket_addr, stream, host) = self.connect_raw().await?;
let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(allow_self_signed_compute)
.build()
.unwrap();
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
// connect_raw() will not use TLS if sslmode is "disable"
let (client, connection) = self.0.connect_raw(stream, tls).await?;
let stream = connection.stream.into_inner();
info!(
"connected to compute node at {host} ({socket_addr}) sslmode={:?}",
self.0.get_ssl_mode()
);
// This is very ugly but as of now there's no better way to
// extract the connection parameters from tokio-postgres' connection.
@@ -220,8 +261,11 @@ impl ConnCfg {
}
/// Connect to a corresponding compute node.
pub async fn connect(&self) -> Result<PostgresConnection, ConnectionError> {
self.do_connect()
pub async fn connect(
&self,
allow_self_signed_compute: bool,
) -> Result<PostgresConnection, ConnectionError> {
self.do_connect(allow_self_signed_compute)
.inspect_err(|err| {
// Immediately log the error we have at our disposal.
error!("couldn't connect to compute node: {err}");

View File

@@ -12,6 +12,7 @@ pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub auth_backend: auth::BackendType<'static, ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
}
#[derive(Debug)]

View File

@@ -170,6 +170,9 @@ pub struct NodeInfo {
/// Labels for proxy's metrics.
pub aux: Arc<MetricsAuxInfo>,
/// Whether we should accept self-signed certificates (for testing)
pub allow_self_signed_compute: bool,
}
pub type NodeInfoCache = TimedLru<Arc<str>, NodeInfo>;

View File

@@ -8,6 +8,7 @@ use crate::{auth::ClientCredentials, compute, error::io_error, scram, url::ApiUr
use async_trait::async_trait;
use futures::TryFutureExt;
use thiserror::Error;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
#[derive(Debug, Error)]
@@ -86,11 +87,13 @@ impl Api {
let mut config = compute::ConnCfg::new();
config
.host(self.endpoint.host_str().unwrap_or("localhost"))
.port(self.endpoint.port().unwrap_or(5432));
.port(self.endpoint.port().unwrap_or(5432))
.ssl_mode(SslMode::Disable);
let node = NodeInfo {
config,
aux: Default::default(),
allow_self_signed_compute: false,
};
Ok(node)

View File

@@ -8,6 +8,7 @@ use super::{
use crate::{auth::ClientCredentials, compute, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
#[derive(Clone)]
@@ -100,11 +101,12 @@ impl Api {
// We'll set username and such later using the startup message.
// TODO: add more type safety (in progress).
let mut config = compute::ConnCfg::new();
config.host(host).port(port);
config.host(host).port(port).ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
let node = NodeInfo {
config,
aux: body.aux.into(),
allow_self_signed_compute: false,
};
Ok(node)

57
proxy/src/lib.rs Normal file
View File

@@ -0,0 +1,57 @@
use anyhow::{bail, Context};
use futures::{Future, FutureExt};
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::warn;
pub mod auth;
pub mod cache;
pub mod cancellation;
pub mod compute;
pub mod config;
pub mod console;
pub mod error;
pub mod http;
pub mod logging;
pub mod metrics;
pub mod parse;
pub mod proxy;
pub mod sasl;
pub mod scram;
pub mod stream;
pub mod url;
pub mod waiters;
/// Handle unix signals appropriately.
pub async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut hangup = signal(SignalKind::hangup())?;
let mut interrupt = signal(SignalKind::interrupt())?;
let mut terminate = signal(SignalKind::terminate())?;
loop {
tokio::select! {
// Hangup is commonly used for config reload.
_ = hangup.recv() => {
warn!("received SIGHUP; config reload is not supported");
}
// Shut down the whole application.
_ = interrupt.recv() => {
warn!("received SIGINT, exiting immediately");
bail!("interrupted");
}
_ = terminate.recv() => {
warn!("received SIGTERM, shutting down once all existing connections have closed");
token.cancel();
}
}
}
}
/// Flattens `Result<Result<T>>` into `Result<T>`.
pub async fn flatten_err(
f: impl Future<Output = Result<anyhow::Result<()>, JoinError>>,
) -> anyhow::Result<()> {
f.map(|r| r.context("join error").and_then(|x| x)).await
}

Some files were not shown because too many files have changed in this diff Show More