Compare commits

..

55 Commits

Author SHA1 Message Date
Dmitry Ivanov
389dfb6809 Simplify low-hanging fruit 2023-05-16 18:00:11 +03:00
Dmitry Ivanov
b2bdd1b117 Changes 2023-05-16 17:09:33 +03:00
Joonas Koivunen
0b42ba9615 refactor: log errors with {:#}
even though it's a std::io::Error

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-05-11 20:01:24 +03:00
Joonas Koivunen
6d3be4cf7d doc: review suggestions
Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-05-11 20:00:33 +03:00
Joonas Koivunen
7605aed5aa fixup: refactor: stop using todo! for a panic! 2023-05-10 18:21:19 +03:00
Joonas Koivunen
e13ae811bf fixup: move doc test as test case and hide _spawn variants 2023-05-10 17:55:45 +03:00
Joonas Koivunen
36e86e370b fixup: nicer ascii art 2023-05-10 17:55:05 +03:00
Joonas Koivunen
8e51d5d8f3 doc: remove FIXME simplify uploading
there's no need to simplify the uploading; the uploading is not on
within this one future which will be executed so it's not safe to just
remove it or revert it back.

also I am hesitant to add new changes at this point, the diff is large
enough.
2023-05-10 17:35:26 +03:00
Joonas Koivunen
72131e15ff test: less racy test_delete_timeline_client_hangup
with the SharedRetried the first request creates a task which will
continue to the pause point regardless of the first request, so we need
to accept a 404 as success.
2023-05-10 17:31:19 +03:00
Joonas Koivunen
0495043d30 test: fix test_concurrent_timeline_delete_if_first_stuck_at_index_upload
manually cherry-picked 245a2a0592
2023-05-10 17:30:51 +03:00
Joonas Koivunen
c87a742bbd fixup: one missed logging opportunity 2023-05-10 16:53:14 +03:00
Joonas Koivunen
c7c514aae4 doc: more cleanup 2023-05-10 16:53:03 +03:00
Joonas Koivunen
f7b0beb1cf refactor: simplify, racy removes no longer intended 2023-05-10 16:27:08 +03:00
Joonas Koivunen
b5b0a2e872 fixup: doc: align strong usage, fix broken references 2023-05-10 16:25:43 +03:00
Joonas Koivunen
cd787070f6 fixup: doc: explain convoluted return type 2023-05-10 16:25:07 +03:00
Joonas Koivunen
05d4a85c2f fixup: doc: many futures instead of tasks
as in, you could run many of these futures with tokio::select within one
task or `runtime.block_on`; it does not matter.
2023-05-10 16:24:18 +03:00
Joonas Koivunen
9a35ae40b5 fixup: clippy 2023-05-10 15:54:14 +03:00
Joonas Koivunen
3d33ea465d fix: coalesce requests to delete_timeline 2023-05-10 15:46:55 +03:00
Joonas Koivunen
fd50c95b9d feat: utils::shared_retryable::SharedRetryable
documented in the module.
2023-05-10 15:08:42 +03:00
Dmitry Rodionov
eb3a8be933 keep track of timeline deletion status in IndexPart to prevent timeline resurrection (#3919)
Before this patch, the following sequence would lead to the resurrection of a deleted timeline:

- create timeline
- wait for its index part to reach s3
- delete timeline
- wait an arbitrary amount of time, including 0 seconds
- detach tenant
- attach tenant
- the timeline is there and Active again

This happens because we only kept track of the deletion in the tenant dir (by deleting the timeline dir) but not in S3.

The solution is to turn the deleted timeline's IndexPart into a tombstone.
The deletion status of the timeline is expressed in the `deleted_at: Option<NativeDateTime>` field of IndexPart.
It's `None` while the timeline is alive and `Some(deletion time stamp)` if it is deleted.

We change the timeline deletion handler to upload this tombstoned IndexPart.
The handler does not return success if the upload fails.

Coincidentally, this fixes the long-stanging TODO about the `std::fs::remove_dir_all` being not atomic.
It need not be atomic anymore because we set the `deleted_at=Some()` before starting the `remove_dir_all`.

The tombstone is in the IndexPart only, not in the `metadata`.
So, we only have the tombstone and the `remove_dir_all` benefits mentioned above if remote storage is configured.
This was a conscious trade-off because there's no good format evolution story for the current metadata file format.

The introduction of this additional step into `delete_timeline` was painful because delete_timeline needs to be
1. cancel-safe
2. idempotent
3. safe to call concurrently
These are mostly self-inflicted limitations that can be avoided by using request-coalescing.
PR https://github.com/neondatabase/neon/pull/4159 will do that.

fixes https://github.com/neondatabase/neon/issues/3560

refs https://github.com/neondatabase/neon/issues/3889 (part of tenant relocation)


Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-05-10 10:27:12 +02:00
Christian Schwarz
3ec52088dd eviction_task: tracing::instrument the imitate-access calls (#4180)
Currently, if we unexpectly download from the eviction task, the log
lines look like what we have in
https://github.com/neondatabase/neon/issues/4154

```
2023-05-04T14:42:57.586772Z  WARN eviction_task{tenant_id=$TENANT timeline_id=$TIMELINE}:eviction_iteration{policy_kind="LayerAccessThreshold"}: unexpectedly on-demand downloading remote layer remote $TIMELINE/000000067F000032AC0000400C00FFFFFFFF-000000067F000032AC000040140000000008__0000000001696070-0000000003DC76E9 for task kind Eviction
```

We know these are caused by the imitate accesses.
But we don't know which one (my bet is on update_gc_info).

I didn't want to pollute the other tasks' logs with the additional
spans, so, using `.instrument()` when we call non-eviction-task code.

refs https://github.com/neondatabase/neon/issues/4154
2023-05-09 18:16:22 +02:00
Heikki Linnakangas
66b06e416a Pass tracing context in env variables instead of the spec file. (#4174)
If compute_ctl is launched without a spec file, it fetches it from the
control plane with an HTTP request. We cannot get the startup tracing
context from the compute spec in that case, because we don't have it
available on start. We could still read the tracing context from the
compute spec after we have fetched it, but that would leave the fetch
itself out of the context. Pass the tracing context in environment
variables instead.
2023-05-09 17:08:02 +03:00
Arthur Petukhovsky
d62315327a Allow parallel backup in safekeepers (#4177)
Add `wal_backup_parallel_jobs` cmdline argument to specify the max count
of parallel segments upload. New default value is 5, meaning that
safekeepers will try to upload 5 segments concurrently if they are
available. Setting this value to 1 will be equivalent to the sequential
upload that we had before.

Part of the https://github.com/neondatabase/neon/issues/3957
2023-05-09 12:20:35 +03:00
Anastasia Lubennikova
4bd7b1daf2 Bump vendor/postgres:
Fix entering hot standby mode for Neon postgres v15
2023-05-08 21:25:47 +01:00
Sergey Melnikov
0d3d022eb1 Remove deploy workflows (#4157)
## Describe your changes
Removing deploy workflows (moving to aws repo)
2023-05-08 17:30:16 +02:00
Raouf Chebri
e85cbddd2e Update neondatabase banner in README.md (#4176)
## Describe your changes

## Issue ticket number and link

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-05-08 17:12:42 +02:00
Anton Chaporgin
51ff9f9359 pg-sni-router nlb is internal (#4164) 2023-05-08 18:03:50 +03:00
Vadim Kharitonov
0f8b2d8f0a Compile kq_imcx extension (#3568)
## Describe your changes
Compiles kq_imcx extension

At this moment, there are some issues with the extension:
1. I'm cloning it directly from the master branch. It's better to fetch
tag/archive
2. PG14:
```
postgres=# CREATE EXTENSION IF NOT EXISTS kq_imcx;
postgres=# select * from kq_calendar_cache_info();
2023-02-08 13:55:22.853 UTC [412] ERROR:  relation "ketteq.slice_type" does not exist at character 34
2023-02-08 13:55:22.853 UTC [412] QUERY:  select min(s.id), max(s.id) from ketteq.slice_type s
2023-02-08 13:55:22.853 UTC [412] STATEMENT:  select * from kq_calendar_cache_info();
ERROR:  relation "ketteq.slice_type" does not exist
LINE 1: select min(s.id), max(s.id) from ketteq.slice_type s
```
3. PG15:
`cannot request additional shared memory outside shmem_request_hook`

Note: I don't think we need to publish info about this extension in the
docs.

## Issue ticket number and link
neondatabase/cloud#3387
2023-05-08 15:56:08 +02:00
Gleb Novikov
9860d59aa2 Public docker image repository by default 2023-05-08 15:51:54 +04:00
Christian Schwarz
411c71b486 document current tenant attach API semantics (#4151)
We currently return 202 as soon as the tenant is allocated in memory
before we've written out the marker file. So, the /attach API currently
does not have a transactional character. For example, it can happen that
we respond with a 202 and then crash before writing out the marker file.
In such a case, it is important that the client

1. observes the lost attach (by polling tenant status and observing 404)
2. and consequently retries the attach.

It has to do it in this loop until it observes the tenant as "Active" in
the tenant status. If the client doesn't follow this protocol and
instead goes to another pageserver to attach the tenant, we risk a
split-brain situation where both the first and second pageserver write
to the tenant's S3 state.

The improved description highlights the consequences of this behavior
for clients that use the /attach endpoint.

The tenant relocation that is currently being implemented in cloud#4740
implements retries of Attach and it does poll afterwards, but, it polls
`has_in_progress_downloads`.
That is incorrect, as described in the patch body.

The motivation for this write-up is that, in a future PR, we'll extend
the /attach endpoint with an option to provide the tenant config. If we
decide to leave the non-transactional behavior of /attach unmodified, we
will be able to avoid persisting the tenant config. Conversely, if we
decide that the /attach API should become transactional, we'll need to
persist the tenant config in the attach-marker-file before acknowledging
receipt of the /attach operation.

refs https://github.com/neondatabase/cloud/pull/4740
refs https://github.com/neondatabase/neon/issues/2238
refs https://github.com/neondatabase/neon/issues/1555
2023-05-05 19:32:41 +03:00
Alexey Kondratov
dd4fd89dc6 [compute_ctl] Do not initialize last_active on start (#4137)
Our scale-to-zero logic was optimized for short auto-suspend intervals,
e.g. minutes or hours. In this case, if compute was restarted by k8s due
to some reason (OOM, k8s node went down, pod relocation, etc.),
`last_active` got bumped, we start counting auto-suspend timeout again.
It's not a big deal, i.e. we suspend completely idle compute not after 5
minutes, but after 10 minutes or so.

Yet, some clients may want days or even weeks. And chance that compute
could be restarted during this interval is pretty high, but in this case
we could be not able to suspend some computes for weeks.

After this commit, we won't initialize `last_active` on start, so
`/status` could return an unset attribute. This means that there was no
user activity since start. Control-plane should deal with it by taking
`max()` out of all available activity timestamps: `started_at`,
`last_active`, etc.

compute_ctl part of neondatabase/cloud#4853
2023-05-05 11:45:37 +02:00
Alexander Bayandin
653e633c59 test_runner: add --pg-version pytest argument (#4037)
- allows setting Postgres version for testing using --pg-version argument
- fixes tests for the non-default Postgres version.
2023-05-05 02:57:47 +03:00
Alexander Bayandin
291b4f0d41 Update client libs for test_runner/pg_clients to their latest versions (#4092)
Also, use Workaround D for `swift/PostgresClientKitExample`, 
which supports neither SNI nor connections options
2023-05-04 18:22:04 +01:00
Christian Schwarz
88f39c11d4 refactor: the code that builds TenantConfOpt from mgmt API requests (#4152)
- extract code that builds TenantConfOpt from requests into a From<>
impl
- move map_err(ApiError::BadRequest) into callers
2023-05-04 18:10:40 +03:00
Heikki Linnakangas
b627fa71e4 Make read-only replicas explicit in compute spec (#4136)
This builds on top of PR #4058, and supersedes #4018
2023-05-04 17:41:42 +03:00
Christian Schwarz
7dd9553bbb eviction: regression test + distinguish layer write from map insert (#4005)
This patch adds a regression test for the threshold-based layer
eviction.
The test asserts the basic invariant that, if left alone, the residence
statuses will stabilize, with some layers resident and some layers
evicted.
Thereby, we cover both the aspect of last-access-time-threshold-based
eviction, and the "imitate access" hacks that we put in recently.

The aggressive `period` and `threshold` values revealed a subtle bug
which is also fixed in this patch.
The symptom was that, without the Rust changes of this patch, there
would be occasional test failures due to `WARN... unexpectedly
downloading` log messages.
These log messages were caused by the "imitate access" calls of the
eviction task.
But, the whole point of the "imitate access" hack was to prevent
eviction of the layers that we access there.
After some digging, I found the root cause, which is the following race
condition:

1. Compact: Write out an L1 layer from several L0 layers. This records
residence event `LayerCreate` with the current timestamp.
2. Eviction: imitate access logical size calculation. This accesses the
L0 layers because the L1 layer is not yet in the layer map.
3. Compact: Grab layer map lock, add the new L1 to layer map and remove
the L0s, release layer map lock.
4. Eviction: observes the new L1 layer whose only activity timestamp is
the `LayerCreate` event.

The L1 layer had no chance of being accessed until after (3).
So, if enough time passes between (1) and (3), then (4) will observe a
layer with `now-last_activity > threshold` and evict it

The fix is to require the first `record_residence_event` to happen while
we already hold the layer map lock.
The API requires a ref to a `BatchedUpdates` as a witness that we are
inside a layer map lock.
That is not fool-proof, e.g., new call sites for `insert_historic` could
just completely forget to record the residence event.
It would be nice to prevent this at the type level.
In the meantime, we have a rate-limited log messages to warn us, if such
an implementation error sneaks in in the future.

fixes https://github.com/neondatabase/neon/issues/3593
fixes https://github.com/neondatabase/neon/issues/3942

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-04 16:16:48 +02:00
Heikki Linnakangas
b5d64a1e32 Rename field, to match field name in XLogData struct and in rust-postgres (#4149)
The field means the same thing as the `wal_end` field in the XLogData
struct. And in the postgres-protocol crate's corresponding
PrimaryKeepAlive struct, it's also called `wal_end`. Let's be
consistent.

As noted by Arthur at
https://github.com/neondatabase/neon/pull/4144#pullrequestreview-1411031881
2023-05-04 14:41:15 +03:00
Christian Schwarz
f9839a0dd9 import_basebackup_from_tar: don't load local layers twice (#4111)
PR #4104 removed these bits as part of a revert of a larger change.

follow-up to
https://github.com/neondatabase/neon/pull/4104#discussion_r1180444952

---

Let's not merge this before the release.
2023-05-04 09:23:49 +02:00
Arthur Petukhovsky
ce1bbc9fa7 Always send the latest commit_lsn in send_wal (#4150)
When a new connection is established to the safekeeper, the 'end_pos'
field is initially set to Lsn::INVALID (i.e 0/0). If there is no WAL to
send to the client, we send KeepAlive messages with Lsn::INVALID. That
confuses the pageserver: it thinks that safekeeper is lagging very much
behind the tip of the branch, and will reconnect to a different
safekeeper. Then the same thing happens with the new safekeeper, until
some WAL is streamed which sets 'end_pos' to a valid value.

This fix always sets `end_pos` to the most recent `commit_lsn` value.
This is useful to send the latest `commit_lsn` to the receiver, so it
will know how advanced this safekeeper compared to the others.

Fixes https://github.com/neondatabase/neon/issues/3972
Supersedes https://github.com/neondatabase/neon/pull/4144
2023-05-04 00:07:45 +03:00
Alexander Bayandin
b114ef26c2 GitHub Autocomment: add a note if no tests were run (#4109)
- Always (if not cancelled) add a comment to a PR
- Mention in the comment if no tests were run / reports were not
generated.
2023-05-03 15:38:49 +01:00
Arthur Petukhovsky
3ceef7b17a Add more safekeeper and walreceiver metrics (#4142)
Add essential safekeeper and pageserver::walreceiver metrics. Mostly
counters, such as the number of received queries, broker messages,
removed WAL segments, or connection switches events in walreceiver.
Also logs broker push loop duration.
2023-05-03 17:07:41 +03:00
Kirill Bulatov
586e6e55f8 Print WalReceiver context on WAL waiting timeout (#4090)
Closes https://github.com/neondatabase/neon/issues/2106

Before:
```
Extracting base backup to create postgres instance: path=/Users/someonetoignore/work/neon/neon_main/test_output/test_pageserver_lsn_wait_error_safekeeper_stop/repo/endpoints/ep-2/pgdata port=15017

              stderr: command failed: page server 'basebackup' command failed

Caused by:
    0: db error: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50
    1: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50

Stack backtrace:
```

After:
```
Extracting base backup to create postgres instance: path=/Users/someonetoignore/work/neon/neon/test_output/test_pageserver_lsn_wait_error_safekeeper_stop/repo/endpoints/ep-2/pgdata port=15011

              stderr: command failed: page server 'basebackup' command failed

Caused by:
    0: db error: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50, WalReceiver status (update 2023-04-26 14:20:39): streaming WAL from node 12346, commit|streaming Lsn: 0/A2C3F58|0/A2C3F58, safekeeper candidates (id|update_time|commit_lsn): [(12348|14:20:40|0/A2C3F58), (12346|14:20:40|0/A2C3F58), (12347|14:20:40|0/A2C3F58)]
    1: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50, WalReceiver status (update 2023-04-26 14:20:39): streaming WAL from node 12346, commit|streaming Lsn: 0/A2C3F58|0/A2C3F58, safekeeper candidates (id|update_time|commit_lsn): [(12348|14:20:40|0/A2C3F58), (12346|14:20:40|0/A2C3F58), (12347|14:20:40|0/A2C3F58)]

Stack backtrace:
```

As the issue requests, the PR adds the context in logs only, but I think
we should expose the context via HTTP management API similar way — it
should be simple with the new API, but better be done in a separate PR.

Co-authored-by: Kirill Bulatov <kirill@neon.tech>
2023-05-03 16:25:19 +03:00
Anton Chaporgin
db81242f4a add debug to pg-sni-router install (#4143) 2023-05-03 16:14:16 +03:00
dependabot[bot]
39ca7c7c09 Bump flask from 2.1.3 to 2.2.5 (#4138) 2023-05-03 10:40:35 +01:00
Heikki Linnakangas
ecc0cf8cd6 Treat EPIPE as an expected error. (#4141)
If the other end of a TCP connection closes its read end of the socket,
you get an EPIPE when you try to send. I saw that happen in the CI once:


https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4136/release/4869464644/index.html#suites/c19bc2126511ef8cb145cca25c438215/7ec87b016c0b4b50/

```
2023-05-03T07:53:22.394152Z ERROR Task 'serving compute connection task' tenant_id: Some(c204447079e02e7ba8f593cb8bc57e76), timeline_id: Some(b666f26600e6deaa9f43e1aeee5bacb7) exited with error: Postgres connection error

Caused by:
    Broken pipe (os error 32)

Stack backtrace:
   0: pageserver::page_service::page_service_conn_main::{{closure}}
             at /__w/neon/neon/pageserver/src/page_service.rs:282:17
      <core::panic::unwind_safe::AssertUnwindSafe<F> as core::future::future::Future>::poll
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panic/unwind_safe.rs:296:9
      <futures_util::future::future::catch_unwind::CatchUnwind<Fut> as core::future::future::Future>::poll::{{closure}}
             at /__w/neon/neon/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.28/src/future/future/catch_unwind.rs:36:42
      <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library/core/src/panic/unwind_safe.rs:271:9
   ...
```

In the passing, add a comment to explain what the "expected" in the
`is_expected_io_error` function means.
2023-05-03 12:03:13 +03:00
Joonas Koivunen
faebe3177b wal_craft: cleanup to keep editable (#4121)
wal_craft had accumulated some trouble by using `use anyhow::*;`. Fixes
that, removes redundant conversions (never need to convert a Path to
OsStr), especially at the `Process` args.

Originally in #4100 but we merged a later PR instead for the fixes. I
dropped the `postmaster.pid` polling in favor of just having a longer
connect timeout.
2023-05-03 11:11:55 +03:00
Joonas Koivunen
474f69c1c0 fix: omit cancellation logging when panicking (#4125)
noticed while describing `RequestSpan`, this fix will omit the otherwise
logged message about request being cancelled when panicking in the
request handler. this was missed on #4064.
2023-05-03 10:56:49 +03:00
Konstantin Knizhnik
47521693ed Fix file_cache build warnings (#4118)
## Describe your changes

## Issue ticket number and link

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-05-02 22:28:21 +03:00
Heikki Linnakangas
4d55d61807 Store basic endpoint info in endpoint.json file. (#4058)
It's more convenient than parsing the postgresql.conf file.

Extracted from PR #3886. I started working on another patch (to make it
safe to run two "neon_local endpoint create" commands concurrently), and
realized that this change will make that simpler too.
2023-05-02 20:36:11 +03:00
Sergey Melnikov
093fafd6bd Deploy pg-sni-router (#4132) 2023-05-01 17:18:45 +02:00
Shany Pozin
e3ae2661ee Add 2 new sets of safekeepers to us-west2 (#4130)
## Describe your changes
TF output:       
module.safekeeper-us-west-2.aws_instance.this["3"]: Creation complete
after 13s [id=i-089f6b9ef426dff76]
module.safekeeper-us-west-2.aws_instance.this["4"]: Creation complete
after 13s [id=i-0fe6bf912c4710c82]
module.safekeeper-us-west-2.aws_instance.this["5"]: Creation complete
after 13s [id=i-0a83c1c46d2b4e409]
module.safekeeper-us-west-2.aws_instance.this["6"]: Creation complete
after 13s [id=i-0fef5317b8fdc9f8d]
module.safekeeper-us-west-2.aws_instance.this["7"]: Creation complete
after 13s [id=i-0be739190d4289bf9]
module.safekeeper-us-west-2.aws_instance.this["8"]: Creation complete
after 13s [id=i-00e851803669e5cfe]
2023-05-01 14:22:59 +03:00
Anton Chaporgin
7e368f3edf build pg-sni-router binary (#4129)
## Describe your changes
This adds pg-sni-router binary to the build pipeline and neon image.

## Issue ticket number and link

https://github.com/neondatabase/cloud/issues/1461
2023-05-01 12:14:31 +02:00
Joonas Koivunen
138bc028ed fix: quick and dirty panic avoidance on drop path (#4128)
Sentry caught a panic on load testing server related to metric removals:
https://neondatabase.sentry.io/issues/4142396994

Turn the `expect` into logging, but also add logging for each removal,
so we could identify in which cases we do double-remove. The
double-removal (or never adding) cause is not obvious or expected.

Original added in #3837.
2023-05-01 11:54:09 +03:00
Stas Kelvich
d53f81b449 Add one more pageserver to staging 2023-04-30 22:39:34 +03:00
Joonas Koivunen
6f472df0d0 fix: restore not logging ignored io errors as errors (#4120)
the fix is rather indirect due to the accidental applying of too much
`anyhow`: if handle_pagerequests returns a `QueryError` it will now be
bubbled up as-is `QueryError`. `QueryError` allows the inner
`std::io::Error` to be inspected and thus we can filter certain error
kinds which are perfectly normal without a huge log message.

for a very long time (b2f5102) the errors were converted to `anyhow` by
mistake which made this difficult or impossible, even though from the
types it would *appear* that we propagate wrapped `std::io::Error`s and
can filter them.

Fixes #4113, most likely filters some other errors as well.
2023-04-30 14:34:55 +03:00
140 changed files with 3580 additions and 3518 deletions

View File

@@ -1,5 +0,0 @@
neon_install.tar.gz
.neon_current_version
collections/*
!collections/.keep

View File

@@ -1,12 +0,0 @@
[defaults]
localhost_warning = False
host_key_checking = False
timeout = 30
[ssh_connection]
ssh_args = -F ./ansible.ssh.cfg
# teleport doesn't support sftp yet https://github.com/gravitational/teleport/issues/7127
# and scp neither worked for me
transfer_method = piped
pipelining = True

View File

@@ -1,15 +0,0 @@
# Remove this once https://github.com/gravitational/teleport/issues/10918 is fixed
# (use pre 8.5 option name to cope with old ssh in CI)
PubkeyAcceptedKeyTypes +ssh-rsa-cert-v01@openssh.com
Host tele.zenith.tech
User admin
Port 3023
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
Host * !tele.zenith.tech
User admin
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
ProxyJump tele.zenith.tech

View File

View File

@@ -1,211 +0,0 @@
- name: Upload Neon binaries
hosts: storage
gather_facts: False
remote_user: "{{ remote_user }}"
tasks:
- name: get latest version of Neon binaries
register: current_version_file
set_fact:
current_version: "{{ lookup('file', '.neon_current_version') | trim }}"
tags:
- pageserver
- safekeeper
- name: inform about versions
debug:
msg: "Version to deploy - {{ current_version }}"
tags:
- pageserver
- safekeeper
- name: upload and extract Neon binaries to /usr/local
ansible.builtin.unarchive:
owner: root
group: root
src: neon_install.tar.gz
dest: /usr/local
become: true
tags:
- pageserver
- safekeeper
- binaries
- putbinaries
- name: Deploy pageserver
hosts: pageservers
gather_facts: False
remote_user: "{{ remote_user }}"
tasks:
- name: upload init script
when: console_mgmt_base_url is defined
ansible.builtin.template:
src: scripts/init_pageserver.sh
dest: /tmp/init_pageserver.sh
owner: root
group: root
mode: '0755'
become: true
tags:
- pageserver
- name: init pageserver
shell:
cmd: /tmp/init_pageserver.sh
args:
creates: "/storage/pageserver/data/tenants"
environment:
NEON_REPO_DIR: "/storage/pageserver/data"
LD_LIBRARY_PATH: "/usr/local/v14/lib"
become: true
tags:
- pageserver
- name: read the existing remote pageserver config
ansible.builtin.slurp:
src: /storage/pageserver/data/pageserver.toml
register: _remote_ps_config
tags:
- pageserver
- name: parse the existing pageserver configuration
ansible.builtin.set_fact:
_existing_ps_config: "{{ _remote_ps_config['content'] | b64decode | sivel.toiletwater.from_toml }}"
tags:
- pageserver
- name: construct the final pageserver configuration dict
ansible.builtin.set_fact:
pageserver_config: "{{ pageserver_config_stub | combine({'id': _existing_ps_config.id }) }}"
tags:
- pageserver
- name: template the pageserver config
template:
src: templates/pageserver.toml.j2
dest: /storage/pageserver/data/pageserver.toml
become: true
tags:
- pageserver
# used in `pageserver.service` template
- name: learn current availability_zone
shell:
cmd: "curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone"
register: ec2_availability_zone
- set_fact:
ec2_availability_zone={{ ec2_availability_zone.stdout }}
- name: upload systemd service definition
ansible.builtin.template:
src: systemd/pageserver.service
dest: /etc/systemd/system/pageserver.service
owner: root
group: root
mode: '0644'
become: true
tags:
- pageserver
- name: start systemd service
ansible.builtin.systemd:
daemon_reload: yes
name: pageserver
enabled: yes
state: restarted
become: true
tags:
- pageserver
- name: post version to console
when: console_mgmt_base_url is defined
shell:
cmd: |
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/pageservers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/pageservers
tags:
- pageserver
- name: Deploy safekeeper
hosts: safekeepers
gather_facts: False
remote_user: "{{ remote_user }}"
tasks:
- name: upload init script
when: console_mgmt_base_url is defined
ansible.builtin.template:
src: scripts/init_safekeeper.sh
dest: /tmp/init_safekeeper.sh
owner: root
group: root
mode: '0755'
become: true
tags:
- safekeeper
- name: init safekeeper
shell:
cmd: /tmp/init_safekeeper.sh
args:
creates: "/storage/safekeeper/data/safekeeper.id"
environment:
NEON_REPO_DIR: "/storage/safekeeper/data"
LD_LIBRARY_PATH: "/usr/local/v14/lib"
become: true
tags:
- safekeeper
# used in `safekeeper.service` template
- name: learn current availability_zone
shell:
cmd: "curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone"
register: ec2_availability_zone
- set_fact:
ec2_availability_zone={{ ec2_availability_zone.stdout }}
# in the future safekeepers should discover pageservers byself
# but currently use first pageserver that was discovered
- name: set first pageserver var for safekeepers
set_fact:
first_pageserver: "{{ hostvars[groups['pageservers'][0]]['inventory_hostname'] }}"
tags:
- safekeeper
- name: upload systemd service definition
ansible.builtin.template:
src: systemd/safekeeper.service
dest: /etc/systemd/system/safekeeper.service
owner: root
group: root
mode: '0644'
become: true
tags:
- safekeeper
- name: start systemd service
ansible.builtin.systemd:
daemon_reload: yes
name: safekeeper
enabled: yes
state: restarted
become: true
tags:
- safekeeper
- name: post version to console
when: console_mgmt_base_url is defined
shell:
cmd: |
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/safekeepers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/safekeepers
tags:
- safekeeper

View File

@@ -1,42 +0,0 @@
#!/bin/bash
set -e
if [ -n "${DOCKER_TAG}" ]; then
# Verson is DOCKER_TAG but without prefix
VERSION=$(echo $DOCKER_TAG | sed 's/^.*-//g')
else
echo "Please set DOCKER_TAG environment variable"
exit 1
fi
# do initial cleanup
rm -rf neon_install postgres_install.tar.gz neon_install.tar.gz .neon_current_version
mkdir neon_install
# retrieve binaries from docker image
echo "getting binaries from docker image"
docker pull --quiet neondatabase/neon:${DOCKER_TAG}
ID=$(docker create neondatabase/neon:${DOCKER_TAG})
docker cp ${ID}:/data/postgres_install.tar.gz .
tar -xzf postgres_install.tar.gz -C neon_install
mkdir neon_install/bin/
docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/
docker cp ${ID}:/usr/local/bin/pageserver_binutils neon_install/bin/
docker cp ${ID}:/usr/local/bin/safekeeper neon_install/bin/
docker cp ${ID}:/usr/local/bin/storage_broker neon_install/bin/
docker cp ${ID}:/usr/local/bin/proxy neon_install/bin/
docker cp ${ID}:/usr/local/v14/bin/ neon_install/v14/bin/
docker cp ${ID}:/usr/local/v15/bin/ neon_install/v15/bin/
docker cp ${ID}:/usr/local/v14/lib/ neon_install/v14/lib/
docker cp ${ID}:/usr/local/v15/lib/ neon_install/v15/lib/
docker rm -vf ${ID}
# store version to file (for ansible playbooks) and create binaries tarball
echo ${VERSION} > neon_install/.neon_current_version
echo ${VERSION} > .neon_current_version
tar -czf neon_install.tar.gz -C neon_install .
# do final cleaup
rm -rf neon_install postgres_install.tar.gz

View File

@@ -1,48 +0,0 @@
storage:
vars:
bucket_name: neon-prod-storage-ap-southeast-1
bucket_region: ap-southeast-1
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
broker_endpoint: http://storage-broker-lb.epsilon.ap-southeast-1.internal.aws.neon.tech:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: ap-southeast-1
ansible_aws_ssm_bucket_name: neon-prod-storage-ap-southeast-1
console_region_id: aws-ap-southeast-1
sentry_environment: production
children:
pageservers:
hosts:
pageserver-0.ap-southeast-1.aws.neon.tech:
ansible_host: i-064de8ea28bdb495b
pageserver-1.ap-southeast-1.aws.neon.tech:
ansible_host: i-0b180defcaeeb6b93
safekeepers:
hosts:
safekeeper-0.ap-southeast-1.aws.neon.tech:
ansible_host: i-0d6f1dc5161eef894
safekeeper-2.ap-southeast-1.aws.neon.tech:
ansible_host: i-04fb63634e4679eb9
safekeeper-3.ap-southeast-1.aws.neon.tech:
ansible_host: i-05481f3bc88cfc2d4

View File

@@ -1,50 +0,0 @@
storage:
vars:
bucket_name: neon-prod-storage-eu-central-1
bucket_region: eu-central-1
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
broker_endpoint: http://storage-broker-lb.gamma.eu-central-1.internal.aws.neon.tech:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: eu-central-1
ansible_aws_ssm_bucket_name: neon-prod-storage-eu-central-1
console_region_id: aws-eu-central-1
sentry_environment: production
children:
pageservers:
hosts:
pageserver-0.eu-central-1.aws.neon.tech:
ansible_host: i-0cd8d316ecbb715be
pageserver-1.eu-central-1.aws.neon.tech:
ansible_host: i-090044ed3d383fef0
pageserver-2.eu-central-1.aws.neon.tech:
ansible_host: i-033584edf3f4b6742
safekeepers:
hosts:
safekeeper-0.eu-central-1.aws.neon.tech:
ansible_host: i-0b238612d2318a050
safekeeper-1.eu-central-1.aws.neon.tech:
ansible_host: i-07b9c45e5c2637cd4
safekeeper-2.eu-central-1.aws.neon.tech:
ansible_host: i-020257302c3c93d88

View File

@@ -1,50 +0,0 @@
storage:
vars:
bucket_name: neon-prod-storage-us-east-1
bucket_region: us-east-1
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
broker_endpoint: http://storage-broker-lb.theta.us-east-1.internal.aws.neon.tech:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-east-1
ansible_aws_ssm_bucket_name: neon-prod-storage-us-east-1
console_region_id: aws-us-east-1
sentry_environment: production
children:
pageservers:
hosts:
pageserver-0.us-east-1.aws.neon.tech:
ansible_host: i-085222088b0d2e0c7
pageserver-1.us-east-1.aws.neon.tech:
ansible_host: i-0969d4f684d23a21e
pageserver-2.us-east-1.aws.neon.tech:
ansible_host: i-05dee87895da58dad
safekeepers:
hosts:
safekeeper-0.us-east-1.aws.neon.tech:
ansible_host: i-04ce739e88793d864
safekeeper-1.us-east-1.aws.neon.tech:
ansible_host: i-0e9e6c9227fb81410
safekeeper-2.us-east-1.aws.neon.tech:
ansible_host: i-072f4dd86a327d52f

View File

@@ -1,51 +0,0 @@
storage:
vars:
bucket_name: neon-prod-storage-us-east-2
bucket_region: us-east-2
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
broker_endpoint: http://storage-broker-lb.delta.us-east-2.internal.aws.neon.tech:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-east-2
ansible_aws_ssm_bucket_name: neon-prod-storage-us-east-2
console_region_id: aws-us-east-2
sentry_environment: production
children:
pageservers:
hosts:
pageserver-0.us-east-2.aws.neon.tech:
ansible_host: i-062227ba7f119eb8c
pageserver-1.us-east-2.aws.neon.tech:
ansible_host: i-0b3ec0afab5968938
pageserver-2.us-east-2.aws.neon.tech:
ansible_host: i-0d7a1c4325e71421d
safekeepers:
hosts:
safekeeper-0.us-east-2.aws.neon.tech:
ansible_host: i-0e94224750c57d346
safekeeper-1.us-east-2.aws.neon.tech:
ansible_host: i-06d113fb73bfddeb0
safekeeper-2.us-east-2.aws.neon.tech:
ansible_host: i-09f66c8e04afff2e8

View File

@@ -1,60 +0,0 @@
storage:
vars:
bucket_name: neon-prod-storage-us-west-2
bucket_region: us-west-2
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
broker_endpoint: http://storage-broker-lb.eta.us-west-2.internal.aws.neon.tech:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-west-2
ansible_aws_ssm_bucket_name: neon-prod-storage-us-west-2
console_region_id: aws-us-west-2-new
sentry_environment: production
children:
pageservers:
hosts:
pageserver-0.us-west-2.aws.neon.tech:
ansible_host: i-0d9f6dfae0e1c780d
pageserver-1.us-west-2.aws.neon.tech:
ansible_host: i-0c834be1dddba8b3f
pageserver-2.us-west-2.aws.neon.tech:
ansible_host: i-051642d372c0a4f32
pageserver-3.us-west-2.aws.neon.tech:
ansible_host: i-00c3844beb9ad1c6b
pageserver-4.us-west-2.aws.neon.tech:
ansible_host: i-013263dd1c239adcc
pageserver-5.us-west-2.aws.neon.tech:
ansible_host: i-00ca6417c7bf96820
pageserver-6.us-west-2.aws.neon.tech:
ansible_host: i-01cdf7d2bc1433b6a
pageserver-7.us-west-2.aws.neon.tech:
ansible_host: i-02eec9b40617db5bc
safekeepers:
hosts:
safekeeper-0.us-west-2.aws.neon.tech:
ansible_host: i-00719d8a74986fda6
safekeeper-1.us-west-2.aws.neon.tech:
ansible_host: i-074682f9d3c712e7c
safekeeper-2.us-west-2.aws.neon.tech:
ansible_host: i-042b7efb1729d7966

View File

@@ -1,37 +0,0 @@
#!/bin/sh
# fetch params from meta-data service
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
AZ_ID=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone)
INSTANCE_TYPE=$(curl -s http://169.254.169.254/latest/meta-data/instance-type)
DISK_SIZE=$(df -B1 /storage | tail -1 | awk '{print $2}')
# store fqdn hostname in var
HOST=$(hostname -f)
cat <<EOF | tee /tmp/payload
{
"version": 1,
"host": "${HOST}",
"port": 6400,
"region_id": "{{ console_region_id }}",
"instance_id": "${INSTANCE_ID}",
"http_host": "${HOST}",
"http_port": 9898,
"active": false,
"availability_zone_id": "${AZ_ID}",
"disk_size": ${DISK_SIZE},
"instance_type": "${INSTANCE_TYPE}"
}
EOF
# check if pageserver already registered or not
if ! curl -sf -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/pageservers/${INSTANCE_ID} -o /dev/null; then
# not registered, so register it now
ID=$(curl -sf -X POST -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/pageservers -d@/tmp/payload | jq -r '.id')
# init pageserver
sudo -u pageserver /usr/local/bin/pageserver -c "id=${ID}" -c "pg_distrib_dir='/usr/local'" --init -D /storage/pageserver/data
fi

View File

@@ -1,31 +0,0 @@
#!/bin/sh
# fetch params from meta-data service
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
AZ_ID=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone)
# store fqdn hostname in var
HOST=$(hostname -f)
cat <<EOF | tee /tmp/payload
{
"version": 1,
"host": "${HOST}",
"port": 6500,
"http_port": 7676,
"region_id": "{{ console_region_id }}",
"instance_id": "${INSTANCE_ID}",
"availability_zone_id": "${AZ_ID}",
"active": false
}
EOF
# check if safekeeper already registered or not
if ! curl -sf -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/safekeepers/${INSTANCE_ID} -o /dev/null; then
# not registered, so register it now
ID=$(curl -sf -X POST -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/safekeepers -d@/tmp/payload | jq -r '.id')
# init safekeeper
sudo -u safekeeper /usr/local/bin/safekeeper --id ${ID} --init -D /storage/safekeeper/data
fi

View File

@@ -1,2 +0,0 @@
ansible_connection: aws_ssm
ansible_python_interpreter: /usr/bin/python3

View File

@@ -1,47 +0,0 @@
storage:
vars:
bucket_name: neon-dev-storage-eu-central-1
bucket_region: eu-central-1
# We only register/update storage in one preview console and manually copy to other instances
console_mgmt_base_url: http://neon-internal-api.helium.aws.neon.build
broker_endpoint: http://storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.helium.aws.neon.build/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: eu-central-1
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-central-1
console_region_id: aws-eu-central-1
sentry_environment: staging
children:
pageservers:
hosts:
pageserver-0.eu-central-1.aws.neon.build:
ansible_host: i-011f93ec26cfba2d4
safekeepers:
hosts:
safekeeper-0.eu-central-1.aws.neon.build:
ansible_host: i-0ff026d27babf8ddd
safekeeper-1.eu-central-1.aws.neon.build:
ansible_host: i-03983a49ee54725d9
safekeeper-2.eu-central-1.aws.neon.build:
ansible_host: i-0bd025ecdb61b0db3

View File

@@ -1,58 +0,0 @@
storage:
vars:
bucket_name: neon-dev-storage-eu-west-1
bucket_region: eu-west-1
console_mgmt_base_url: http://neon-internal-api.aws.neon.build
broker_endpoint: http://storage-broker-lb.zeta.eu-west-1.internal.aws.neon.build:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: eu-west-1
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-west-1
console_region_id: aws-eu-west-1
sentry_environment: staging
children:
pageservers:
hosts:
pageserver-0.eu-west-1.aws.neon.build:
ansible_host: i-01d496c5041c7f34c
safekeepers:
hosts:
safekeeper-0.eu-west-1.aws.neon.build:
ansible_host: i-05226ef85722831bf
safekeeper-1.eu-west-1.aws.neon.build:
ansible_host: i-06969ee1bf2958bfc
safekeeper-2.eu-west-1.aws.neon.build:
ansible_host: i-087892e9625984a0b
safekeeper-3.eu-west-1.aws.neon.build:
ansible_host: i-0a6f91660e99e8891
safekeeper-4.eu-west-1.aws.neon.build:
ansible_host: i-0012e309e28e7c249
safekeeper-5.eu-west-1.aws.neon.build:
ansible_host: i-085a2b1193287b32e
safekeeper-6.eu-west-1.aws.neon.build:
ansible_host: i-0c713248465ed0fbd
safekeeper-7.eu-west-1.aws.neon.build:
ansible_host: i-02ad231aed2a80b7a
safekeeper-8.eu-west-1.aws.neon.build:
ansible_host: i-0dbbd8ffef66efda8

View File

@@ -1,56 +0,0 @@
storage:
vars:
bucket_name: neon-staging-storage-us-east-2
bucket_region: us-east-2
console_mgmt_base_url: http://neon-internal-api.aws.neon.build
broker_endpoint: http://storage-broker-lb.beta.us-east-2.internal.aws.neon.build:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-east-2
ansible_aws_ssm_bucket_name: neon-staging-storage-us-east-2
console_region_id: aws-us-east-2
sentry_environment: staging
children:
pageservers:
hosts:
pageserver-0.us-east-2.aws.neon.build:
ansible_host: i-0c3e70929edb5d691
pageserver-1.us-east-2.aws.neon.build:
ansible_host: i-0565a8b4008aa3f40
pageserver-2.us-east-2.aws.neon.build:
ansible_host: i-01e31cdf7e970586a
pageserver-3.us-east-2.aws.neon.build:
ansible_host: i-0602a0291365ef7cc
pageserver-99.us-east-2.aws.neon.build:
ansible_host: i-0c39491109bb88824
safekeepers:
hosts:
safekeeper-0.us-east-2.aws.neon.build:
ansible_host: i-027662bd552bf5db0
safekeeper-2.us-east-2.aws.neon.build:
ansible_host: i-0de0b03a51676a6ce
safekeeper-3.us-east-2.aws.neon.build:
ansible_host: i-05f8ba2cda243bd18
safekeeper-99.us-east-2.aws.neon.build:
ansible_host: i-0d61b6a2ea32028d5

View File

@@ -1,18 +0,0 @@
[Unit]
Description=Neon pageserver
After=network.target auditd.service
[Service]
Type=simple
User=pageserver
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_PAGESERVER }} SENTRY_ENVIRONMENT={{ sentry_environment }}
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -c "availability_zone='{{ ec2_availability_zone }}'" -D /storage/pageserver/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT
Restart=on-failure
TimeoutSec=10
LimitNOFILE=30000000
[Install]
WantedBy=multi-user.target

View File

@@ -1,18 +0,0 @@
[Unit]
Description=Neon safekeeper
After=network.target auditd.service
[Service]
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_SAFEKEEPER }} SENTRY_ENVIRONMENT={{ sentry_environment }}
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}' --availability-zone={{ ec2_availability_zone }}
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT
Restart=on-failure
TimeoutSec=10
LimitNOFILE=30000000
[Install]
WantedBy=multi-user.target

View File

@@ -1 +0,0 @@
{{ pageserver_config | sivel.toiletwater.to_toml }}

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: staging
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "staging"

View File

@@ -1,76 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.eu-west-1.aws.neon.build"
otelExporterOtlpEndpoint: "https://otel-collector.zeta.eu-west-1.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: dev
neon_region: eu-west-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: eu-west-1.aws.neon.build
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: staging
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.zeta.eu-west-1.internal.aws.neon.build
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "staging"

View File

@@ -1,68 +0,0 @@
# Helm chart values for neon-proxy-link.
# This is a YAML-formatted file.
image:
repository: neondatabase/neon
settings:
authBackend: "link"
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
uri: "https://console.stage.neon.tech/psql_session/"
domain: "pg.neon.build"
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"
# -- Additional labels for neon-proxy-link pods
podLabels:
neon_service: proxy
neon_env: dev
neon_region: us-east-2
service:
type: LoadBalancer
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal
external-dns.alpha.kubernetes.io/hostname: neon-proxy-link-mgmt.beta.us-east-2.aws.neon.build
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: neon-proxy-link.beta.us-east-2.aws.neon.build
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,77 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.cloud.stage.neon.tech"
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram-legacy
neon_env: dev
neon_region: us-east-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: neon-proxy-scram-legacy.beta.us-east-2.aws.neon.build
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,78 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
extraDomains: ["*.us-east-2.postgres.zenith.tech", "*.us-east-2.retooldb-staging.com"]
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: dev
neon_region: us-east-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-east-2.aws.neon.build
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: staging
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.beta.us-east-2.internal.aws.neon.build
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "staging"

View File

@@ -1,67 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/management/api/v2"
domain: "*.cloud.${PREVIEW_NAME}.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: test
neon_region: ${PREVIEW_NAME}.eu-central-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: cloud.${PREVIEW_NAME}.aws.neon.build
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,77 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.ap-southeast-1.aws.neon.tech"
extraDomains: ["*.ap-southeast-1.retooldb.com", "*.ap-southeast-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: ap-southeast-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: ap-southeast-1.aws.neon.tech
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.epsilon.ap-southeast-1.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -1,77 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.eu-central-1.aws.neon.tech"
extraDomains: ["*.eu-central-1.retooldb.com", "*.eu-central-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: eu-central-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: eu-central-1.aws.neon.tech
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.gamma.eu-central-1.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -1,69 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-1.aws.neon.tech"
# *.us-east-1.retooldb.com hasn't been delegated yet.
extraDomains: ["*.us-east-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-east-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-east-1.aws.neon.tech
httpsPort: 443
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.theta.us-east-1.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -1,58 +0,0 @@
# Helm chart values for neon-proxy-link.
# This is a YAML-formatted file.
image:
repository: neondatabase/neon
settings:
authBackend: "link"
authEndpoint: "https://console.neon.tech/authenticate_proxy_request/"
uri: "https://console.neon.tech/psql_session/"
domain: "pg.neon.tech"
sentryEnvironment: "production"
# -- Additional labels for zenith-proxy pods
podLabels:
neon_service: proxy
neon_env: production
neon_region: us-east-2
service:
type: LoadBalancer
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal
external-dns.alpha.kubernetes.io/hostname: neon-proxy-link-mgmt.delta.us-east-2.aws.neon.tech
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: neon-proxy-link.delta.us-east-2.aws.neon.tech
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,77 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-2.aws.neon.tech"
extraDomains: ["*.us-east-2.retooldb.com", "*.us-east-2.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-east-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-east-2.aws.neon.tech
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.delta.us-east-2.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -1,76 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.cloud.neon.tech"
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-west-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: neon-proxy-scram-legacy.eta.us-west-2.aws.neon.tech
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,77 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-west-2.aws.neon.tech"
extraDomains: ["*.us-west-2.retooldb.com", "*.us-west-2.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-west-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-west-2.aws.neon.tech
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.eta.us-west-2.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -418,10 +418,7 @@ jobs:
- uses: actions/github-script@v6
if: >
!cancelled() &&
github.event_name == 'pull_request' && (
steps.create-allure-report-debug.outputs.report-url ||
steps.create-allure-report-release.outputs.report-url
)
github.event_name == 'pull_request'
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
@@ -633,6 +630,7 @@ jobs:
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
@@ -718,6 +716,7 @@ jobs:
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--dockerfile Dockerfile.compute-tools
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
@@ -770,6 +769,7 @@ jobs:
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg PG_VERSION=${{ matrix.version }}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--dockerfile Dockerfile.compute-node
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
@@ -935,42 +935,6 @@ jobs:
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
deploy-pr-test-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ promote-images, tag, regress-tests ]
if: |
contains(github.event.pull_request.labels.*.name, 'deploy-test-storage') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
target_region: [ eu-west-1 ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Redeploy
run: |
export DOCKER_TAG=${{needs.tag.outputs.build-tag}}
cd "$(pwd)/.github/ansible"
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
@@ -995,12 +959,12 @@ jobs:
- name: Trigger deploy workflow
env:
GH_TOKEN: ${{ github.token }}
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow run deploy-dev.yml --ref main -f branch=${{ github.sha }} -f dockerTag=${{needs.tag.outputs.build-tag}}
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow run deploy-prod.yml --ref release -f branch=${{ github.sha }} -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1

View File

@@ -1,229 +0,0 @@
name: Neon Deploy dev
on:
workflow_dispatch:
inputs:
dockerTag:
description: 'Docker tag to deploy'
required: true
type: string
branch:
description: 'Branch or commit used for deploy scripts and configs'
required: true
type: string
default: 'main'
deployStorage:
description: 'Deploy storage'
required: true
type: boolean
default: true
deployProxy:
description: 'Deploy proxy'
required: true
type: boolean
default: true
deployStorageBroker:
description: 'Deploy storage-broker'
required: true
type: boolean
default: true
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
concurrency:
group: deploy-dev
cancel-in-progress: false
jobs:
deploy-storage-new:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
options: --user root --privileged
if: inputs.deployStorage
defaults:
run:
shell: bash
strategy:
matrix:
# TODO(sergey): Fix storage deploy in eu-central-1
target_region: [ eu-west-1, us-east-2]
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Redeploy
run: |
export DOCKER_TAG=${{ inputs.dockerTag }}
cd "$(pwd)/.github/ansible"
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy-proxy-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployProxy
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: dev-us-east-2-beta
deploy_link_proxy: true
deploy_legacy_scram_proxy: true
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy scram proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy link proxy
if: matrix.deploy_link_proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy legacy scram proxy
if: matrix.deploy_legacy_scram_proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-preview-proxy-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployProxy
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy preview proxies
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
for PREVIEW_NAME in helium argon krypton xenon radon oganesson hydrogen nitrogen oxygen fluorine chlorine; do
export PREVIEW_NAME
envsubst <.github/helm-values/preview-template.neon-proxy-scram.yaml >preview-${PREVIEW_NAME}.neon-proxy-scram.yaml
helm upgrade neon-proxy-scram-${PREVIEW_NAME} neondatabase/neon-proxy --namespace neon-proxy-${PREVIEW_NAME} --create-namespace --install --atomic -f preview-${PREVIEW_NAME}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
done
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployStorageBroker
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: dev-us-east-2-beta
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache

View File

@@ -1,173 +0,0 @@
name: Neon Deploy prod
on:
workflow_dispatch:
inputs:
dockerTag:
description: 'Docker tag to deploy'
required: true
type: string
branch:
description: 'Branch or commit used for deploy scripts and configs'
required: true
type: string
default: 'release'
deployStorage:
description: 'Deploy storage'
required: true
type: boolean
default: true
deployProxy:
description: 'Deploy proxy'
required: true
type: boolean
default: true
deployStorageBroker:
description: 'Deploy storage-broker'
required: true
type: boolean
default: true
disclamerAcknowledged:
description: 'I confirm that there is an emergency and I can not use regular release workflow'
required: true
type: boolean
default: false
concurrency:
group: deploy-prod
cancel-in-progress: false
jobs:
deploy-prod-new:
runs-on: prod
container:
image: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
options: --user root --privileged
if: inputs.deployStorage && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
strategy:
matrix:
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1, us-east-1 ]
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Redeploy
run: |
export DOCKER_TAG=${{ inputs.dockerTag }}
cd "$(pwd)/.github/ansible"
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
deploy-proxy-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
if: inputs.deployProxy && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: prod-us-east-2-delta
deploy_link_proxy: true
deploy_legacy_scram_proxy: false
- target_region: us-west-2
target_cluster: prod-us-west-2-eta
deploy_link_proxy: false
deploy_legacy_scram_proxy: true
- target_region: eu-central-1
target_cluster: prod-eu-central-1-gamma
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
- target_region: us-east-1
target_cluster: prod-us-east-1-theta
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy scram proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy link proxy
if: matrix.deploy_link_proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy legacy scram proxy
if: matrix.deploy_legacy_scram_proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
deploy-storage-broker-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
if: inputs.deployStorageBroker && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: prod-us-east-2-delta
- target_region: us-west-2
target_cluster: prod-us-west-2-eta
- target_region: eu-central-1
target_cluster: prod-eu-central-1-gamma
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
- target_region: us-east-1
target_cluster: prod-us-east-1-theta
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s

3
Cargo.lock generated
View File

@@ -1032,6 +1032,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"utils",
"workspace_hack",
]
@@ -1105,6 +1106,7 @@ dependencies = [
"anyhow",
"clap 4.2.2",
"comfy-table",
"compute_api",
"git-version",
"nix",
"once_cell",
@@ -3674,6 +3676,7 @@ dependencies = [
"remote_storage",
"reqwest",
"safekeeper_api",
"scopeguard",
"serde",
"serde_json",
"serde_with",

View File

@@ -2,7 +2,7 @@
### The image itself is mainly used as a container for the binaries and for starting e2e tests with custom parameters.
### By default, the binaries inside the image have some mock parameters and can start, but are not intended to be used
### inside this image in the real deployments.
ARG REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
ARG REPOSITORY=neondatabase
ARG IMAGE=rust
ARG TAG=pinned
@@ -44,7 +44,15 @@ COPY --chown=nonroot . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin storage_broker --bin proxy --locked --release \
&& mold -run cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pageserver_binutils \
--bin draw_timeline_dir \
--bin safekeeper \
--bin storage_broker \
--bin proxy \
--locked --release \
&& cachepot -s
# Build final image
@@ -63,6 +71,7 @@ RUN set -e \
&& useradd -d /data neon \
&& chown -R neon:neon /data
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin

View File

@@ -1,5 +1,5 @@
ARG PG_VERSION
ARG REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
ARG REPOSITORY=neondatabase
ARG IMAGE=rust
ARG TAG=pinned
@@ -393,6 +393,28 @@ RUN case "${PG_VERSION}" in \
make install -j $(getconf _NPROCESSORS_ONLN) && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_hint_plan.control
#########################################################################################
#
# Layer "kq-imcx-pg-build"
# compile kq_imcx extension
#
#########################################################################################
FROM build-deps AS kq-imcx-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN apt-get update && \
apt-get install -y git libgtk2.0-dev libpq-dev libpam-dev libxslt-dev libkrb5-dev cmake && \
wget https://github.com/ketteq-neon/postgres-exts/archive/e0bd1a9d9313d7120c1b9c7bb15c48c0dede4c4e.tar.gz -O kq_imcx.tar.gz && \
echo "dc93a97ff32d152d32737ba7e196d9687041cda15e58ab31344c2f2de8855336 kq_imcx.tar.gz" | sha256sum --check && \
mkdir kq_imcx-src && cd kq_imcx-src && tar xvzf ../kq_imcx.tar.gz --strip-components=1 -C . && \
mkdir build && \
cd build && \
cmake .. && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/kq_imcx.control
#########################################################################################
#
# Layer "rust extensions"
@@ -506,6 +528,7 @@ COPY --from=hll-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plpgsql-check-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hint-plan-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=kq-imcx-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -1,6 +1,6 @@
# First transient image to build compute_tools binaries
# NB: keep in sync with rust image version in .github/workflows/build_and_test.yml
ARG REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
ARG REPOSITORY=neondatabase
ARG IMAGE=rust
ARG TAG=pinned

View File

@@ -1,3 +1,5 @@
[![Neon](https://user-images.githubusercontent.com/13738772/236813940-dcfdcb5b-69d3-449b-a686-013febe834d4.png)](https://neon.tech)
# Neon
Neon is a serverless open-source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
@@ -222,10 +224,6 @@ postgres=# select * from t;
> ./target/debug/neon_local stop
```
## Running own installation
If you are interested in running your own cluster of neon storages and compute, it is described in the following doc: [docs/deployment.md](./docs/deployment.md)
## Running tests
Ensure your dependencies are installed as described [here](https://github.com/neondatabase/neon#dependency-installation-notes).

View File

@@ -30,6 +30,7 @@
//! -b /usr/local/bin/postgres
//! ```
//!
use std::collections::HashMap;
use std::fs::File;
use std::panic;
use std::path::Path;
@@ -67,6 +68,54 @@ fn main() -> Result<()> {
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
// Extract OpenTelemetry context for the startup actions from the
// TRACEPARENT and TRACESTATE env variables, and attach it to the current
// tracing context.
//
// This is used to propagate the context for the 'start_compute' operation
// from the neon control plane. This allows linking together the wider
// 'start_compute' operation that creates the compute container, with the
// startup actions here within the container.
//
// There is no standard for passing context in env variables, but a lot of
// tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
// https://github.com/open-telemetry/opentelemetry-specification/issues/740
//
// Switch to the startup context here, and exit it once the startup has
// completed and Postgres is up and running.
//
// If this pod is pre-created without binding it to any particular endpoint
// yet, this isn't the right place to enter the startup context. In that
// case, the control plane should pass the tracing context as part of the
// /configure API call.
//
// NOTE: This is supposed to only cover the *startup* actions. Once
// postgres is configured and up-and-running, we exit this span. Any other
// actions that are performed on incoming HTTP requests, for example, are
// performed in separate spans.
//
// XXX: If the pod is restarted, we perform the startup actions in the same
// context as the original startup actions, which probably doesn't make
// sense.
let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
if let Ok(val) = std::env::var("TRACEPARENT") {
startup_tracing_carrier.insert("traceparent".to_string(), val);
}
if let Ok(val) = std::env::var("TRACESTATE") {
startup_tracing_carrier.insert("tracestate".to_string(), val);
}
let startup_context_guard = if !startup_tracing_carrier.is_empty() {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
let guard = TraceContextPropagator::new()
.extract(&startup_tracing_carrier)
.attach();
info!("startup tracing context attached");
Some(guard)
} else {
None
};
let compute_id = matches.get_one::<String>("compute-id");
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
@@ -148,8 +197,6 @@ fn main() -> Result<()> {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().expect("spec must be set");
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
// Record for how long we slept waiting for the spec.
state.metrics.wait_for_spec_ms = Utc::now()
@@ -165,29 +212,6 @@ fn main() -> Result<()> {
compute.state_changed.notify_all();
drop(state);
// Extract OpenTelemetry context for the startup actions from the spec, and
// attach it to the current tracing context.
//
// This is used to propagate the context for the 'start_compute' operation
// from the neon control plane. This allows linking together the wider
// 'start_compute' operation that creates the compute container, with the
// startup actions here within the container.
//
// Switch to the startup context here, and exit it once the startup has
// completed and Postgres is up and running.
//
// NOTE: This is supposed to only cover the *startup* actions. Once
// postgres is configured and up-and-running, we exit this span. Any other
// actions that are performed on incoming HTTP requests, for example, are
// performed in separate spans.
let startup_context_guard = if let Some(ref carrier) = startup_tracing_context {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
Some(TraceContextPropagator::new().extract(carrier).attach())
} else {
None
};
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =

View File

@@ -30,7 +30,7 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
use compute_api::spec::{ComputeMode, ComputeSpec};
use crate::config;
use crate::pg_helpers::*;
@@ -67,8 +67,9 @@ pub struct ComputeNode {
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
/// Timestamp of the last Postgres activity. It could be `None` if
/// compute wasn't used since start.
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
pub pspec: Option<ParsedSpec>,
pub metrics: ComputeMetrics,
@@ -79,7 +80,7 @@ impl ComputeState {
Self {
start_time: Utc::now(),
status: ComputeStatus::Empty,
last_active: Utc::now(),
last_active: None,
error: None,
pspec: None,
metrics: ComputeMetrics::default(),
@@ -249,38 +250,10 @@ impl ComputeNode {
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
#[derive(Clone)]
enum Replication {
Primary,
Static { lsn: Lsn },
HotStandby,
}
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
let hot_replica = if let Some(option) = spec.cluster.settings.find_ref("hot_standby") {
if let Some(value) = &option.value {
anyhow::ensure!(option.vartype == "bool");
matches!(value.as_str(), "on" | "yes" | "true")
} else {
false
}
} else {
false
};
let replication = if hot_replica {
Replication::HotStandby
} else if let Some(lsn) = spec.cluster.settings.find("recovery_target_lsn") {
Replication::Static {
lsn: Lsn::from_str(&lsn)?,
}
} else {
Replication::Primary
};
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
@@ -288,8 +261,8 @@ impl ComputeNode {
// Syncing safekeepers is only safe with primary nodes: if a primary
// is already connected it will be kicked out, so a secondary (standby)
// cannot sync safekeepers.
let lsn = match &replication {
Replication::Primary => {
let lsn = match spec.mode {
ComputeMode::Primary => {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
@@ -297,11 +270,11 @@ impl ComputeNode {
info!("safekeepers synced at LSN {}", lsn);
lsn
}
Replication::Static { lsn } => {
ComputeMode::Static(lsn) => {
info!("Starting read-only node at static LSN {}", lsn);
*lsn
lsn
}
Replication::HotStandby => {
ComputeMode::Replica => {
info!("Initializing standby from latest Pageserver LSN");
Lsn(0)
}
@@ -321,9 +294,9 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
match &replication {
Replication::Primary | Replication::Static { .. } => {}
Replication::HotStandby => {
match spec.mode {
ComputeMode::Primary | ComputeMode::Static(..) => {}
ComputeMode::Replica => {
add_standby_signal(pgdata_path)?;
}
}
@@ -430,11 +403,13 @@ impl ComputeNode {
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
if spec.mode == ComputeMode::Primary {
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
}
// 'Close' connection
drop(client);
@@ -467,7 +442,9 @@ impl ComputeNode {
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
self.apply_config(&compute_state)?;
if spec.spec.mode == ComputeMode::Primary {
self.apply_config(&compute_state)?;
}
let startup_end_time = Utc::now();
{

View File

@@ -6,7 +6,7 @@ use std::path::Path;
use anyhow::Result;
use crate::pg_helpers::PgOptionsSerialize;
use compute_api::spec::ComputeSpec;
use compute_api::spec::{ComputeMode, ComputeSpec};
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
@@ -34,17 +34,25 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
/// Create or completely rewrite configuration file specified by `path`
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
// File::create() destroys the file content if it exists.
let mut postgres_conf = File::create(path)?;
let mut file = File::create(path)?;
write_auto_managed_block(&mut postgres_conf, &spec.cluster.settings.as_pg_settings())?;
Ok(())
}
// Write Postgres config block wrapped with generated comment section
fn write_auto_managed_block(file: &mut File, buf: &str) -> Result<()> {
writeln!(file, "# Managed by compute_ctl: begin")?;
writeln!(file, "{}", buf)?;
write!(file, "{}", &spec.cluster.settings.as_pg_settings())?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;
writeln!(file, "recovery_target_lsn='{lsn}'")?;
}
ComputeMode::Replica => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;
}
}
writeln!(file, "# Managed by compute_ctl: end")?;
Ok(())

View File

@@ -181,8 +181,8 @@ components:
ComputeState:
type: object
required:
- start_time
- status
- last_active
properties:
start_time:
type: string
@@ -195,11 +195,13 @@ components:
$ref: '#/components/schemas/ComputeStatus'
last_active:
type: string
description: The last detected compute activity timestamp in UTC and RFC3339 format.
description: |
The last detected compute activity timestamp in UTC and RFC3339 format.
It could be empty if compute was never used by user since start.
example: "2022-10-12T07:20:50.52Z"
error:
type: string
description: Text of the error during compute startup, if any.
description: Text of the error during compute startup or reconfiguration, if any.
example: ""
tenant:
type: string
@@ -222,9 +224,12 @@ components:
ComputeStatus:
type: string
enum:
- empty
- init
- failed
- running
- configuration_pending
- configuration
example: running
#

View File

@@ -74,7 +74,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
// Found non-idle backend, so the last activity is NOW.
// Save it and exit the for loop. Also clear the idle backend
// `state_change` timestamps array as it doesn't matter now.
last_active = Utc::now();
last_active = Some(Utc::now());
idle_backs.clear();
break;
}
@@ -82,15 +82,16 @@ fn watch_compute_activity(compute: &ComputeNode) {
// Get idle backend `state_change` with the max timestamp.
if let Some(last) = idle_backs.iter().max() {
last_active = *last;
last_active = Some(*last);
}
}
// Update the last activity in the shared state if we got a more recent one.
let mut state = compute.state.lock().unwrap();
// NB: `Some(<DateTime>)` is always greater than `None`.
if last_active > state.last_active {
state.last_active = last_active;
debug!("set the last compute activity time to: {}", last_active);
debug!("set the last compute activity time to: {:?}", last_active);
}
}
Err(e) => {

View File

@@ -30,4 +30,5 @@ postgres_connection.workspace = true
storage_broker.workspace = true
utils.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true

View File

@@ -7,8 +7,8 @@
//!
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint::Replication;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -481,7 +481,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
timeline_id,
None,
pg_version,
Replication::Primary,
ComputeMode::Primary,
)?;
println!("Done");
}
@@ -568,8 +568,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
let lsn_str = match endpoint.replication {
Replication::Static(lsn) => {
let lsn_str = match endpoint.mode {
ComputeMode::Static(lsn) => {
// -> read-only endpoint
// Use the node's LSN.
lsn.to_string()
@@ -632,21 +632,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.unwrap_or(false);
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
(None, false) => ComputeMode::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
cplane.new_endpoint(
tenant_id,
&endpoint_id,
timeline_id,
port,
pg_version,
replication,
)?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, port, pg_version, mode)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -670,11 +663,11 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.unwrap_or(false);
if let Some(endpoint) = endpoint {
match (&endpoint.replication, hot_standby) {
(Replication::Static(_), true) => {
match (&endpoint.mode, hot_standby) {
(ComputeMode::Static(_), true) => {
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
}
(Replication::Primary, true) => {
(ComputeMode::Primary, true) => {
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
}
_ => {}
@@ -701,10 +694,10 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.context("Failed to `pg-version` from the argument string")?;
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
(None, false) => ComputeMode::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
@@ -721,7 +714,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
timeline_id,
port,
pg_version,
replication,
mode,
)?;
ep.start(&auth_token)?;
}

View File

@@ -11,15 +11,33 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
use crate::local_env::LocalEnv;
use crate::pageserver::PageServerNode;
use crate::postgresql_conf::PostgresConf;
use compute_api::spec::ComputeMode;
// contents of a endpoint.json file
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
name: String,
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
timeline_id: TimelineId,
mode: ComputeMode,
port: u16,
pg_version: u32,
}
//
// ComputeControlPlane
//
@@ -70,7 +88,7 @@ impl ComputeControlPlane {
timeline_id: TimelineId,
port: Option<u16>,
pg_version: u32,
replication: Replication,
mode: ComputeMode,
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
@@ -80,12 +98,22 @@ impl ComputeControlPlane {
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
timeline_id,
replication,
mode,
tenant_id,
pg_version,
});
ep.create_pgdata()?;
std::fs::write(
ep.endpoint_path().join("endpoint.json"),
serde_json::to_string_pretty(&EndpointConf {
name: name.to_string(),
tenant_id,
timeline_id,
mode,
port,
pg_version,
})?,
)?;
ep.setup_pg_conf()?;
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
@@ -96,26 +124,13 @@ impl ComputeControlPlane {
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Replication {
// Regular read-write node
Primary,
// if recovery_target_lsn is provided, and we want to pin the node to a specific LSN
Static(Lsn),
// Hot standby; read-only replica.
// Future versions may want to distinguish between replicas with hot standby
// feedback and other kinds of replication configurations.
Replica,
}
#[derive(Debug)]
pub struct Endpoint {
/// used as the directory name
name: String,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub replication: Replication,
pub mode: ComputeMode,
// port and address of the Postgres server
pub address: SocketAddr,
@@ -144,50 +159,20 @@ impl Endpoint {
let fname = entry.file_name();
let name = fname.to_str().unwrap().to_string();
// Read config file into memory
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
let cfg_path_str = cfg_path.to_string_lossy();
let mut conf_file = File::open(&cfg_path)
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
let conf = PostgresConf::read(&mut conf_file)
.with_context(|| format!("failed to read config file in {}", cfg_path_str))?;
// Read a few options from the config file
let context = format!("in config file {}", cfg_path_str);
let port: u16 = conf.parse_field("port", &context)?;
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
// If it doesn't exist, assume broken data directory and use default pg version.
let pg_version_path = entry.path().join("PG_VERSION");
let pg_version_str =
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
Replication::Static(Lsn::from_str(lsn_str)?)
} else if let Some(slot_name) = conf.get("primary_slot_name") {
let slot_name = slot_name.to_string();
let prefix = format!("repl_{}_", timeline_id);
assert!(slot_name.starts_with(&prefix));
Replication::Replica
} else {
Replication::Primary
};
// Read the endpoint.json file
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
// ok now
Ok(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
name,
env: env.clone(),
pageserver: Arc::clone(pageserver),
timeline_id,
replication,
tenant_id,
pg_version,
timeline_id: conf.timeline_id,
mode: conf.mode,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
})
}
@@ -323,8 +308,8 @@ impl Endpoint {
conf.append_line("");
// Replication-related configurations, such as WAL sending
match &self.replication {
Replication::Primary => {
match &self.mode {
ComputeMode::Primary => {
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
@@ -366,10 +351,10 @@ impl Endpoint {
conf.append("synchronous_standby_names", "pageserver");
}
}
Replication::Static(lsn) => {
ComputeMode::Static(lsn) => {
conf.append("recovery_target_lsn", &lsn.to_string());
}
Replication::Replica => {
ComputeMode::Replica => {
assert!(!self.env.safekeepers.is_empty());
// TODO: use future host field from safekeeper spec
@@ -409,8 +394,8 @@ impl Endpoint {
}
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = match &self.replication {
Replication::Primary => {
let backup_lsn = match &self.mode {
ComputeMode::Primary => {
if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
@@ -426,8 +411,8 @@ impl Endpoint {
None
}
}
Replication::Static(lsn) => Some(*lsn),
Replication::Replica => {
ComputeMode::Static(lsn) => Some(*lsn),
ComputeMode::Replica => {
None // Take the latest snapshot available to start with
}
};
@@ -526,7 +511,7 @@ impl Endpoint {
// 3. Load basebackup
self.load_basebackup(auth_token)?;
if self.replication != Replication::Primary {
if self.mode != ComputeMode::Primary {
File::create(self.pgdata().join("standby.signal"))?;
}

View File

@@ -1,23 +0,0 @@
# Deploying neon on your own
If you are interested in running neon components in Docker containers, there is a tutorial: https://www.percona.com/blog/using-docker-to-deploy-neon-serverless-postgresql/
In this document we describe how to run all components on your own.
## Components overview
As described in other docs, there are storage components and compute nodes. All of them are basically binaries listening for one or two ports, communicating between each other and some are reading/writing data to disk.
![](./media/neon-components.svg)
_Storage broker_ is an in-memory pub-sub service (see more in [doc](./docs/storage_broker.md)), currently it is required one instance per cluster. It listens on `50051` port (grpc server) and safekeepers and pageservers communicate with it.
_Safekeepers_ are deployed in a cluster, nodes are discovering each other through storage broker and implementing RAFT-based replication, there are needed at least three nodes for now (see more in [doc](./docs/safekeeper-protocol.md)). They are replicating WAL and writing it to data directory, offloading to remote storage if there is no space left on device. Each server is listening on two ports — one for inter-node communication and WAL, the other for HTTP-management API ([spec](../safekeeper/src/http/openapi_spec.yaml)).
_Pageservers_ basically maintain postgres disk state, which is modified after writing to WAL and read on reading queries without participation of safekeepers. Pageservers can be spowned and stopped based on demand, as long as all the tenants are relocated correspondingly (relocated to alive pageserver before stopping). Service is listening on two ports — one for read/write requests from safekeepers and compute nodes, the other port is for management API (like detaching/attachning tenants, see more in [spec](../pageserver/src/http/openapi_spec.yml)). Data is stored in data directory and offloaded to the remote storage, if there is no enough space on device.
_Compute_ nodes are the endpoints, which are basically stateless postgres nodes, which communicates with pageservers and safekeepers to modify and read state of the database. By default, it is listening on `55433` port for postgres protocol.
## Running storage engine
## Running compute nodes

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 40 KiB

View File

@@ -11,4 +11,5 @@ serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
utils = { path = "../utils" }
workspace_hack.workspace = true

View File

@@ -19,7 +19,7 @@ pub struct ComputeStatusResponse {
pub timeline: Option<String>,
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
}
@@ -29,7 +29,7 @@ pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
}
@@ -54,11 +54,15 @@ pub enum ComputeStatus {
Failed,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
fn rfc3339_serialize<S>(x: &Option<DateTime<Utc>>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
if let Some(x) = x {
x.to_rfc3339().serialize(s)
} else {
s.serialize_none()
}
}
/// Response of the /metrics.json API

View File

@@ -3,8 +3,9 @@
//! The spec.json file is used to pass information to 'compute_ctl'. It contains
//! all the information needed to start up the right version of PostgreSQL,
//! and connect it to the storage nodes.
use serde::Deserialize;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::lsn::Lsn;
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
@@ -12,6 +13,7 @@ pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[serde_as]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
@@ -24,9 +26,25 @@ pub struct ComputeSpec {
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
pub storage_auth_token: Option<String>,
#[serde(default)]
pub mode: ComputeMode,
pub startup_tracing_context: Option<HashMap<String, String>>,
pub storage_auth_token: Option<String>,
}
#[serde_as]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeMode {
/// A read-write node
#[default]
Primary,
/// A read-only node, pinned at a particular LSN
Static(#[serde_as(as = "DisplayFromStr")] Lsn),
/// A read-only node that follows the tip of the branch in hot standby mode
///
/// Future versions may want to distinguish between replicas with hot standby
/// feedback and other kinds of replication configurations.
Replica,
}
#[derive(Clone, Debug, Default, Deserialize)]

View File

@@ -50,11 +50,14 @@ impl QueryError {
}
}
/// Returns true if the given error is a normal consequence of a network issue,
/// or the client closing the connection. These errors can happen during normal
/// operations, and don't indicate a bug in our code.
pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(
e.kind(),
ConnectionRefused | ConnectionAborted | ConnectionReset | TimedOut
BrokenPipe | ConnectionRefused | ConnectionAborted | ConnectionReset | TimedOut
)
}

View File

@@ -1,5 +1,4 @@
use anyhow::*;
use core::time::Duration;
use anyhow::{bail, ensure};
use log::*;
use postgres::types::PgLsn;
use postgres::Client;
@@ -8,7 +7,7 @@ use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
use std::cmp::Ordering;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::Instant;
use std::time::{Duration, Instant};
use tempfile::{tempdir, TempDir};
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -55,7 +54,7 @@ impl Conf {
self.datadir.join("pg_wal")
}
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
let path = self.pg_bin_dir()?.join(command);
ensure!(path.exists(), "Command {:?} does not exist", path);
let mut cmd = Command::new(path);
@@ -65,7 +64,7 @@ impl Conf {
Ok(cmd)
}
pub fn initdb(&self) -> Result<()> {
pub fn initdb(&self) -> anyhow::Result<()> {
if let Some(parent) = self.datadir.parent() {
info!("Pre-creating parent directory {:?}", parent);
// Tests may be run concurrently and there may be a race to create `test_output/`.
@@ -79,7 +78,7 @@ impl Conf {
let output = self
.new_pg_command("initdb")?
.arg("-D")
.arg(self.datadir.as_os_str())
.arg(&self.datadir)
.args(["-U", "postgres", "--no-instructions", "--no-sync"])
.output()?;
debug!("initdb output: {:?}", output);
@@ -92,7 +91,7 @@ impl Conf {
Ok(())
}
pub fn start_server(&self) -> Result<PostgresServer> {
pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
info!("Starting Postgres server in {:?}", self.datadir);
let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
let unix_socket_dir_path = unix_socket_dir.path().to_owned();
@@ -100,9 +99,9 @@ impl Conf {
.new_pg_command("postgres")?
.args(["-c", "listen_addresses="])
.arg("-k")
.arg(unix_socket_dir_path.as_os_str())
.arg(&unix_socket_dir_path)
.arg("-D")
.arg(self.datadir.as_os_str())
.arg(&self.datadir)
.args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
.spawn()?;
let server = PostgresServer {
@@ -123,7 +122,7 @@ impl Conf {
&self,
first_segment_name: &str,
last_segment_name: &str,
) -> Result<std::process::Output> {
) -> anyhow::Result<std::process::Output> {
let first_segment_file = self.datadir.join(first_segment_name);
let last_segment_file = self.datadir.join(last_segment_name);
info!(
@@ -133,10 +132,7 @@ impl Conf {
);
let output = self
.new_pg_command("pg_waldump")?
.args([
&first_segment_file.as_os_str(),
&last_segment_file.as_os_str(),
])
.args([&first_segment_file, &last_segment_file])
.output()?;
debug!("waldump output: {:?}", output);
Ok(output)
@@ -144,10 +140,9 @@ impl Conf {
}
impl PostgresServer {
pub fn connect_with_timeout(&self) -> Result<Client> {
pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
while Instant::now() < retry_until {
use std::result::Result::Ok;
if let Ok(client) = self.client_config.connect(postgres::NoTls) {
return Ok(client);
}
@@ -164,7 +159,6 @@ impl PostgresServer {
impl Drop for PostgresServer {
fn drop(&mut self) {
use std::result::Result::Ok;
match self.process.try_wait() {
Ok(Some(_)) => return,
Ok(None) => {
@@ -179,12 +173,12 @@ impl Drop for PostgresServer {
}
pub trait PostgresClientExt: postgres::GenericClient {
fn pg_current_wal_insert_lsn(&mut self) -> Result<PgLsn> {
fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
Ok(self
.query_one("SELECT pg_current_wal_insert_lsn()", &[])?
.get(0))
}
fn pg_current_wal_flush_lsn(&mut self) -> Result<PgLsn> {
fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
Ok(self
.query_one("SELECT pg_current_wal_flush_lsn()", &[])?
.get(0))
@@ -193,7 +187,7 @@ pub trait PostgresClientExt: postgres::GenericClient {
impl<C: postgres::GenericClient> PostgresClientExt for C {}
pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result<()> {
pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
client.execute("create extension if not exists neon_test_utils", &[])?;
let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
@@ -227,13 +221,13 @@ pub trait Crafter {
/// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
/// May include or exclude Lsn(0) and the end-of-wal.
/// * The expected end-of-wal LSN.
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)>;
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
}
fn craft_internal<C: postgres::GenericClient>(
client: &mut C,
f: impl Fn(&mut C, PgLsn) -> Result<(Vec<PgLsn>, Option<PgLsn>)>,
) -> Result<(Vec<PgLsn>, PgLsn)> {
f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
ensure_server_config(client)?;
let initial_lsn = client.pg_current_wal_insert_lsn()?;
@@ -265,7 +259,7 @@ fn craft_internal<C: postgres::GenericClient>(
pub struct Simple;
impl Crafter for Simple {
const NAME: &'static str = "simple";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_internal(client, |client, _| {
client.execute("CREATE table t(x int)", &[])?;
Ok((Vec::new(), None))
@@ -276,7 +270,7 @@ impl Crafter for Simple {
pub struct LastWalRecordXlogSwitch;
impl Crafter for LastWalRecordXlogSwitch {
const NAME: &'static str = "last_wal_record_xlog_switch";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
// Do not use generate_internal because here we end up with flush_lsn exactly on
// the segment boundary and insert_lsn after the initial page header, which is unusual.
ensure_server_config(client)?;
@@ -298,7 +292,7 @@ impl Crafter for LastWalRecordXlogSwitch {
pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
// Do not use generate_internal because here we end up with flush_lsn exactly on
// the segment boundary and insert_lsn after the initial page header, which is unusual.
ensure_server_config(client)?;
@@ -365,7 +359,7 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
fn craft_single_logical_message(
client: &mut impl postgres::GenericClient,
transactional: bool,
) -> Result<(Vec<PgLsn>, PgLsn)> {
) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_internal(client, |client, initial_lsn| {
ensure!(
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
@@ -407,7 +401,7 @@ fn craft_single_logical_message(
pub struct WalRecordCrossingSegmentFollowedBySmallOne;
impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_single_logical_message(client, true)
}
}
@@ -415,7 +409,7 @@ impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
pub struct LastWalRecordCrossingSegment;
impl Crafter for LastWalRecordCrossingSegment {
const NAME: &'static str = "last_wal_record_crossing_segment";
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_single_logical_message(client, false)
}
}

View File

@@ -613,7 +613,7 @@ pub struct XLogDataBody<'a> {
#[derive(Debug)]
pub struct WalSndKeepAlive {
pub sent_ptr: u64,
pub wal_end: u64, // current end of WAL on the server
pub timestamp: i64,
pub request_reply: bool,
}
@@ -924,7 +924,7 @@ impl<'a> BeMessage<'a> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'k');
buf.put_u64(req.sent_ptr);
buf.put_u64(req.wal_end);
buf.put_i64(req.timestamp);
buf.put_u8(u8::from(req.request_reply));
});

View File

@@ -128,6 +128,15 @@ impl RemoteStorage for LocalFs {
// We need this dance with sort of durable rename (without fsyncs)
// to prevent partial uploads. This was really hit when pageserver shutdown
// cancelled the upload and partial file was left on the fs
// NOTE: Because temp file suffix always the same this operation is racy.
// Two concurrent operations can lead to the following sequence:
// T1: write(temp)
// T2: write(temp) -> overwrites the content
// T1: rename(temp, dst) -> succeeds
// T2: rename(temp, dst) -> fails, temp no longet exists
// This can be solved by supplying unique temp suffix every time, but this situation
// is not normal in the first place, the error can help (and helped at least once)
// to discover bugs in upper level synchronization.
let temp_file_path =
path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
let mut destination = io::BufWriter::new(

View File

@@ -131,7 +131,9 @@ impl RequestCancelled {
impl Drop for RequestCancelled {
fn drop(&mut self) {
if let Some(span) = self.warn.take() {
if std::thread::panicking() {
// we are unwinding due to panicking, assume we are not dropped for cancellation
} else if let Some(span) = self.warn.take() {
// the span has all of the info already, but the outer `.instrument(span)` has already
// been dropped, so we need to manually re-enter it for this message.
//

View File

@@ -58,6 +58,12 @@ pub mod pageserver_feedback;
pub mod tracing_span_assert;
pub mod rate_limit;
/// Primitive for coalescing operations into a single task which will not be cancelled by for
/// example external http client closing the connection.
pub mod shared_retryable;
/// use with fail::cfg("$name", "return(2000)")
#[macro_export]
macro_rules! failpoint_sleep_millis_async {

View File

@@ -0,0 +1,66 @@
//! A helper to rate limit operations.
use std::time::{Duration, Instant};
pub struct RateLimit {
last: Option<Instant>,
interval: Duration,
}
impl RateLimit {
pub fn new(interval: Duration) -> Self {
Self {
last: None,
interval,
}
}
/// Call `f` if the rate limit allows.
/// Don't call it otherwise.
pub fn call<F: FnOnce()>(&mut self, f: F) {
let now = Instant::now();
match self.last {
Some(last) if now - last <= self.interval => {
// ratelimit
}
_ => {
self.last = Some(now);
f();
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
#[test]
fn basics() {
use super::RateLimit;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
let called = AtomicUsize::new(0);
let mut f = RateLimit::new(Duration::from_millis(100));
let cl = || {
called.fetch_add(1, Relaxed);
};
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 3);
}
}

View File

@@ -0,0 +1,659 @@
use std::future::Future;
use std::sync::Arc;
/// Container using which many request handlers can come together and join a single task to
/// completion instead of racing each other and their own cancellation.
///
/// In a picture:
///
/// ```text
/// SharedRetryable::try_restart Spawned task completes with only one concurrent attempt
/// \ /
/// request handler 1 ---->|--X
/// request handler 2 ---->|-------|
/// request handler 3 ---->|-------|
/// | |
/// v |
/// one spawned task \------>/
///
/// (X = cancelled during await)
/// ```
///
/// Implementation is cancel safe. Implementation and internal structure are hurt by the inability
/// to just spawn the task, but this is needed for pageserver usage.
///
/// Implementation exposes a fully decomposed [`SharedRetryable::try_restart`] which requires the
/// caller to do the spawning before awaiting for the result. If the caller is dropped while this
/// happens, a new attempt will be required, and all concurrent awaiters will see a
/// [`RetriedTaskPanicked`] error.
///
/// There is another "family of APIs" [`SharedRetryable::attempt_spawn`] for infallible futures. It is
/// just provided for completeness, and it does not have a fully decomposed version like
/// `try_restart`.
///
/// For `try_restart_*` family of APIs, there is a concept of two leveled results. The inner level
/// is returned by the executed future. It needs to be `Clone`. Most errors are not `Clone`, so
/// implementation advice is to log the happened error, and not propagate more than a label as the
/// "inner error" which will be used to build an outer error. The outer error will also have to be
/// convertable from [`RetriedTaskPanicked`] to absorb that case as well.
///
/// ## Example
///
/// A shared service value completes the infallible work once, even if called concurrently by
/// multiple cancellable tasks.
///
/// Example moved as a test `service_example`.
#[derive(Clone)]
pub struct SharedRetryable<V> {
inner: Arc<tokio::sync::Mutex<Option<MaybeDone<V>>>>,
}
impl<V> Default for SharedRetryable<V> {
fn default() -> Self {
Self {
inner: Arc::new(tokio::sync::Mutex::new(None)),
}
}
}
/// Determine if an error is transient or permanent.
pub trait Retryable {
fn is_permanent(&self) -> bool {
true
}
}
pub trait MakeFuture {
type Future: Future<Output = Self::Output> + Send + 'static;
type Output: Send + 'static;
fn make_future(self) -> Self::Future;
}
impl<Fun, Fut, R> MakeFuture for Fun
where
Fun: FnOnce() -> Fut,
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
type Future = Fut;
type Output = R;
fn make_future(self) -> Self::Future {
self()
}
}
/// Retried task panicked, was cancelled, or never spawned (see [`SharedRetryable::try_restart`]).
#[derive(Debug, PartialEq, Eq)]
pub struct RetriedTaskPanicked;
impl<T, E1> SharedRetryable<Result<T, E1>>
where
T: Clone + std::fmt::Debug + Send + 'static,
E1: Retryable + Clone + std::fmt::Debug + Send + 'static,
{
/// Restart a previously failed operation unless it already completed with a terminal result.
///
/// Many futures can call this function and and get the terminal result from an earlier attempt
/// or start a new attempt, or join an existing one.
///
/// Compared to `Self::try_restart`, this method also spawns the future to run, which would
/// otherwise have to be done manually.
#[cfg(test)]
pub async fn try_restart_spawn<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
{
let (recv, maybe_fut) = self.try_restart(retry_with).await;
if let Some(fut) = maybe_fut {
// top level function, we must spawn, pageserver cannot use this
tokio::spawn(fut);
}
recv.await
}
/// Restart a previously failed operation unless it already completed with a terminal result.
///
/// Many futures can call this function and get the terminal result from an earlier attempt or
/// start a new attempt, or join an existing one.
///
/// If a task calling this method is cancelled, at worst, a new attempt which is immediatedly
/// deemed as having panicked will happen, but without a panic ever happening.
///
/// Returns one future for waiting for the result and possibly another which needs to be
/// spawned when `Some`. Spawning has to happen before waiting is started, otherwise the first
/// future will never make progress.
///
/// This complication exists because on pageserver we cannot use `tokio::spawn` directly
/// at this time.
pub async fn try_restart<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> (
impl Future<Output = Result<T, E2>> + Send + 'static,
Option<impl Future<Output = ()> + Send + 'static>,
)
where
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
{
use futures::future::Either;
match self.decide_to_retry_or_join(retry_with).await {
Ok(terminal) => (Either::Left(async move { terminal }), None),
Err((rx, maybe_fut)) => {
let recv = Self::make_oneshot_alike_receiver(rx);
(Either::Right(recv), maybe_fut)
}
}
}
/// Returns a Ok if the previous attempt had resulted in a terminal result. Err is returned
/// when an attempt can be joined and possibly needs to be spawned.
async fn decide_to_retry_or_join<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> Result<
Result<T, E2>,
(
tokio::sync::broadcast::Receiver<Result<T, E1>>,
Option<impl Future<Output = ()> + Send + 'static>,
),
>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
let mut g = self.inner.lock().await;
let maybe_rx = match g.as_ref() {
Some(MaybeDone::Done(Ok(t))) => return Ok(Ok(t.to_owned())),
Some(MaybeDone::Done(Err(e))) if e.is_permanent() => {
return Ok(Err(E2::from(e.to_owned())))
}
Some(MaybeDone::Pending(weak)) => {
// failure to upgrade can mean only one thing: there was an unexpected
// panic which we consider as a transient retryable error.
weak.upgrade()
}
Some(MaybeDone::Done(Err(_retryable))) => None,
None => None,
};
let (strong, maybe_fut) = match maybe_rx {
Some(strong) => (strong, None),
None => {
// new attempt
// panic safety: invoke the factory before configuring the pending value
let fut = retry_with.make_future();
let (strong, fut) = self.make_run_and_complete(fut, &mut g);
(strong, Some(fut))
}
};
// important: the Arc<Receiver> is not held after unlocking
// important: we resubscribe before lock is released to be sure to get a message which
// is sent once receiver is dropped
let rx = strong.resubscribe();
drop(strong);
Err((rx, maybe_fut))
}
/// Configure a new attempt, but leave spawning it to the caller.
///
/// Returns an `Arc<Receiver<V>>` which is valid until the attempt completes, and the future
/// which will need to run to completion outside the lifecycle of the caller.
fn make_run_and_complete(
&self,
fut: impl Future<Output = Result<T, E1>> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, Option<MaybeDone<Result<T, E1>>>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<Result<T, E1>>>,
impl Future<Output = ()> + Send + 'static,
) {
#[cfg(debug_assertions)]
match &**g {
Some(MaybeDone::Pending(weak)) => {
assert!(
weak.upgrade().is_none(),
"when starting a restart, should no longer have an upgradeable channel"
);
}
Some(MaybeDone::Done(Err(err))) => {
assert!(
!err.is_permanent(),
"when restarting, the err must be transient"
);
}
Some(MaybeDone::Done(Ok(_))) => {
panic!("unexpected restart after a completion on MaybeDone");
}
None => {}
}
self.make_run_and_complete_any(fut, g)
}
/// Oneshot alike as in it's a future which will be consumed by an `await`.
///
/// Otherwise the caller might think it's beneficial or reasonable to poll the channel multiple
/// times.
async fn make_oneshot_alike_receiver<E2>(
mut rx: tokio::sync::broadcast::Receiver<Result<T, E1>>,
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
use tokio::sync::broadcast::error::RecvError;
match rx.recv().await {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(E2::from(e)),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(E2::from(RetriedTaskPanicked))
}
}
}
}
impl<V> SharedRetryable<V>
where
V: std::fmt::Debug + Clone + Send + 'static,
{
/// Attempt to run once a spawned future to completion.
///
/// Any previous attempt which panicked will be retried, but the `RetriedTaskPanicked` will be
/// returned when the most recent attempt panicked.
#[cfg(test)]
pub async fn attempt_spawn(
&self,
attempt_with: impl MakeFuture<Output = V>,
) -> Result<V, RetriedTaskPanicked> {
let (rx, maybe_fut) = {
let mut g = self.inner.lock().await;
let maybe_rx = match g.as_ref() {
Some(MaybeDone::Done(v)) => return Ok(v.to_owned()),
Some(MaybeDone::Pending(weak)) => {
// see comment in try_restart
weak.upgrade()
}
None => None,
};
let (strong, maybe_fut) = match maybe_rx {
Some(strong) => (strong, None),
None => {
let fut = attempt_with.make_future();
let (strong, fut) = self.make_run_and_complete_any(fut, &mut g);
(strong, Some(fut))
}
};
// see decide_to_retry_or_join for important notes
let rx = strong.resubscribe();
drop(strong);
(rx, maybe_fut)
};
if let Some(fut) = maybe_fut {
// this is a top level function, need to spawn directly
// from pageserver one wouldn't use this but more piecewise functions
tokio::spawn(fut);
}
let recv = Self::make_oneshot_alike_receiver_any(rx);
recv.await
}
/// Configure a new attempt, but leave spawning it to the caller.
///
/// Forgetting the returned future is outside of scope of any correctness guarantees; all of
/// the waiters will then be deadlocked, and the MaybeDone will forever be pending. Dropping
/// and not running the future will then require a new attempt.
///
/// Also returns an `Arc<Receiver<V>>` which is valid until the attempt completes.
fn make_run_and_complete_any(
&self,
fut: impl Future<Output = V> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, Option<MaybeDone<V>>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<V>>,
impl Future<Output = ()> + Send + 'static,
) {
let (tx, rx) = tokio::sync::broadcast::channel(1);
let strong = Arc::new(rx);
**g = Some(MaybeDone::Pending(Arc::downgrade(&strong)));
let retry = {
let strong = strong.clone();
self.clone().run_and_complete(fut, tx, strong)
};
#[cfg(debug_assertions)]
match &**g {
Some(MaybeDone::Pending(weak)) => {
let rx = weak.upgrade().expect("holding the weak and strong locally");
assert!(Arc::ptr_eq(&strong, &rx));
}
_ => unreachable!("MaybeDone::pending must be set after spawn_and_run_complete_any"),
}
(strong, retry)
}
/// Run the actual attempt, and communicate the response via both:
/// - setting the `MaybeDone::Done`
/// - the broadcast channel
async fn run_and_complete(
self,
fut: impl Future<Output = V>,
tx: tokio::sync::broadcast::Sender<V>,
strong: Arc<tokio::sync::broadcast::Receiver<V>>,
) {
let res = fut.await;
{
let mut g = self.inner.lock().await;
MaybeDone::complete(&mut *g, &strong, res.clone());
// make the weak un-upgradeable by dropping the final alive
// reference to it. it is final Arc because the Arc never escapes
// the critical section in `decide_to_retry_or_join` or `attempt_spawn`.
Arc::try_unwrap(strong).expect("expected this to be the only Arc<Receiver<V>>");
}
// now no one can get the Pending(weak) value to upgrade and they only see
// the Done(res).
//
// send the result value to listeners, if any
drop(tx.send(res));
}
#[cfg(test)]
async fn make_oneshot_alike_receiver_any(
mut rx: tokio::sync::broadcast::Receiver<V>,
) -> Result<V, RetriedTaskPanicked> {
use tokio::sync::broadcast::error::RecvError;
match rx.recv().await {
Ok(t) => Ok(t),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(RetriedTaskPanicked)
}
}
}
}
/// MaybeDone handles synchronization for multiple requests and the single actual task.
///
/// If request handlers witness `Pending` which they are able to upgrade, they are guaranteed a
/// useful `recv().await`, where useful means "value" or "disconnect" arrives. If upgrade fails,
/// this means that "disconnect" has happened in the past.
///
/// On successful execution the one executing task will set this to `Done` variant, with the actual
/// resulting value.
#[derive(Debug)]
pub enum MaybeDone<V> {
Pending(std::sync::Weak<tokio::sync::broadcast::Receiver<V>>),
Done(V),
}
impl<V: std::fmt::Debug> MaybeDone<V> {
fn complete(
this: &mut Option<MaybeDone<V>>,
_strong: &Arc<tokio::sync::broadcast::Receiver<V>>,
outcome: V,
) {
#[cfg(debug_assertions)]
match this {
Some(MaybeDone::Pending(weak)) => {
let same = weak
.upgrade()
// we don't yet have Receiver::same_channel
.map(|rx| Arc::ptr_eq(_strong, &rx))
.unwrap_or(false);
assert!(same, "different channel had been replaced or dropped");
}
other => panic!("unexpected MaybeDone: {other:?}"),
}
*this = Some(MaybeDone::Done(outcome));
}
}
#[cfg(test)]
mod tests {
use super::{RetriedTaskPanicked, Retryable, SharedRetryable};
use std::sync::Arc;
#[derive(Debug)]
enum OuterError {
AttemptPanicked,
Unlucky,
}
#[derive(Clone, Debug)]
enum InnerError {
Unlucky,
}
impl Retryable for InnerError {
fn is_permanent(&self) -> bool {
false
}
}
impl From<InnerError> for OuterError {
fn from(_: InnerError) -> Self {
OuterError::Unlucky
}
}
impl From<RetriedTaskPanicked> for OuterError {
fn from(_: RetriedTaskPanicked) -> Self {
OuterError::AttemptPanicked
}
}
#[tokio::test]
async fn restartable_until_permanent() {
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
let res = shr
.try_restart_spawn(|| async move { panic!("really unlucky") })
.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)));
let res = shr
.try_restart_spawn(|| async move { Err(InnerError::Unlucky) })
.await;
assert!(matches!(res, Err(OuterError::Unlucky)));
let res = shr.try_restart_spawn(|| async move { Ok(42) }).await;
assert!(matches!(res, Ok::<u8, OuterError>(42)));
let res = shr
.try_restart_spawn(|| async move { panic!("rerun should clone Ok(42)") })
.await;
assert!(matches!(res, Ok::<u8, OuterError>(42)));
}
/// Demonstration of the SharedRetryable::attempt
#[tokio::test]
async fn attemptable_until_no_panic() {
let shr = SharedRetryable::<u8>::default();
let res = shr
.attempt_spawn(|| async move { panic!("should not interfere") })
.await;
assert!(matches!(res, Err(RetriedTaskPanicked)), "{res:?}");
let res = shr.attempt_spawn(|| async move { 42 }).await;
assert_eq!(res, Ok(42));
let res = shr
.attempt_spawn(|| async move { panic!("should not be called") })
.await;
assert_eq!(res, Ok(42));
}
#[tokio::test]
async fn cancelling_spawner_is_fine() {
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
let (recv1, maybe_fut) = shr
.try_restart(|| async move { panic!("should not have been called") })
.await;
let should_be_spawned = maybe_fut.unwrap();
let (recv2, maybe_fut) = shr
.try_restart(|| async move {
panic!("should never be called because waiting on should_be_spawned")
})
.await;
assert!(
matches!(maybe_fut, None),
"only the first one should had created the future"
);
let mut recv1 = std::pin::pin!(recv1);
let mut recv2 = std::pin::pin!(recv2);
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {},
_ = &mut recv1 => unreachable!("should not have completed because should_be_spawned not spawned"),
_ = &mut recv2 => unreachable!("should not have completed because should_be_spawned not spawned"),
}
drop(should_be_spawned);
let res = recv1.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
let res = recv2.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
// but we can still reach a terminal state if the api is not misused or the
// should_be_spawned winner is not cancelled
let recv1 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(42) });
let recv2 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(43) });
assert_eq!(recv1.await.unwrap(), 42);
assert_eq!(recv2.await.unwrap(), 42, "43 should never be returned");
}
#[tokio::test]
async fn service_example() {
#[derive(Debug, Clone, Copy)]
enum OneLevelError {
TaskPanicked,
}
impl Retryable for OneLevelError {
fn is_permanent(&self) -> bool {
// for a single level errors, this wording is weird
!matches!(self, OneLevelError::TaskPanicked)
}
}
impl From<RetriedTaskPanicked> for OneLevelError {
fn from(_: RetriedTaskPanicked) -> Self {
OneLevelError::TaskPanicked
}
}
#[derive(Clone, Default)]
struct Service(SharedRetryable<Result<u8, OneLevelError>>);
impl Service {
async fn work(
&self,
completions: Arc<std::sync::atomic::AtomicUsize>,
) -> Result<u8, OneLevelError> {
self.0
.try_restart_spawn(|| async move {
// give time to cancel some of the tasks
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
completions.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self::work_once().await
})
.await
}
async fn work_once() -> Result<u8, OneLevelError> {
Ok(42)
}
}
let svc = Service::default();
let mut js = tokio::task::JoinSet::new();
let barrier = Arc::new(tokio::sync::Barrier::new(10 + 1));
let completions = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let handles = (0..10)
.map(|_| {
js.spawn({
let svc = svc.clone();
let barrier = barrier.clone();
let completions = completions.clone();
async move {
// make sure all tasks are ready to start at the same time
barrier.wait().await;
// after successfully starting the work, any of the futures could get cancelled
svc.work(completions).await
}
})
})
.collect::<Vec<_>>();
barrier.wait().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
handles[5].abort();
let mut cancellations = 0;
while let Some(res) = js.join_next().await {
// all complete with the same result
match res {
Ok(res) => assert_eq!(res.unwrap(), 42),
Err(je) => {
// except for the one task we cancelled; it's cancelling
// does not interfere with the result
assert!(je.is_cancelled());
cancellations += 1;
assert_eq!(cancellations, 1, "only 6th task was aborted");
// however we cannot assert that everytime we get to cancel the 6th task
}
}
}
// there will be at most one terminal completion
assert_eq!(completions.load(std::sync::atomic::Ordering::Relaxed), 1);
}
}

View File

@@ -340,7 +340,29 @@ paths:
format: hex
post:
description: Schedules attach operation to happen in the background for given tenant
description: |
Schedules attach operation to happen in the background for the given tenant.
As soon as the caller sends this request, it must assume the pageserver
starts writing to the tenant's S3 state unless it receives one of the
distinguished errors below that state otherwise.
The method to identify whether a request has arrived at the pageserver, and
whether it has succeeded, is to poll for the tenant status to reach "Active"
or "Broken" state. These values are currently not explicitly documented in
the API spec.
Polling for `has_in_progress_downloads == false` is INCORRECT because that
value can turn `false` during shutdown while the Attach operation is still
unfinished.
There is no way to cancel an in-flight request.
If a client receives a not-distinguished response, e.g., a network timeout,
it MUST retry the /attach request and poll again for tenant status.
In any case, it must
* NOT ASSUME that the /attach request has been lost in the network,
* NOT ASSUME that the request has been lost based on a subsequent
tenant status request returning 404. (The request may still be in flight!)
responses:
"202":
description: Tenant attaching scheduled

View File

@@ -691,16 +691,6 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
Ok(response)
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
//
// let value = result.with_context(bad_duration("name", &value))?;
//
fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn() -> String {
move || format!("Cannot parse `{field_name}` duration {value:?}")
}
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
@@ -708,91 +698,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(&gc_period)
.with_context(bad_duration("gc_period", &gc_period))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(&pitr_interval)
.with_context(bad_duration("pitr_interval", &pitr_interval))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(&walreceiver_connect_timeout)
.with_context(bad_duration(
"walreceiver_connect_timeout",
&walreceiver_connect_timeout,
))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(&lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", &lagging_wal_timeout))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(&checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", &checkpoint_timeout))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(&compaction_period)
.with_context(bad_duration("compaction_period", &compaction_period))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(eviction_policy) = request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde_json::from_value(eviction_policy)
.context("parse field `eviction_policy`")
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let tenant_conf = TenantConfOpt::try_from(&request_data).map_err(ApiError::BadRequest)?;
let target_tenant_id = request_data
.new_tenant_id
@@ -860,85 +766,7 @@ async fn update_tenant_config_handler(
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(&gc_period)
.with_context(bad_duration("gc_period", &gc_period))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(&pitr_interval)
.with_context(bad_duration("pitr_interval", &pitr_interval))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(&walreceiver_connect_timeout)
.with_context(bad_duration(
"walreceiver_connect_timeout",
&walreceiver_connect_timeout,
))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(&lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", &lagging_wal_timeout))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
tenant_conf.trace_read_requests = request_data.trace_read_requests;
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(&checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", &checkpoint_timeout))
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(&compaction_period)
.with_context(bad_duration("compaction_period", &compaction_period))
.map_err(ApiError::BadRequest)?,
);
}
if let Some(eviction_policy) = request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde_json::from_value(eviction_policy)
.context("parse field `eviction_policy`")
.map_err(ApiError::BadRequest)?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let tenant_conf = TenantConfOpt::try_from(&request_data).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)

View File

@@ -1,9 +1,9 @@
use metrics::core::{AtomicU64, GenericCounter};
use metrics::{
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec, Counter, CounterVec,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
UIntGaugeVec,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
Counter, CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::models::TenantState;
@@ -287,14 +287,33 @@ impl EvictionsWithLowResidenceDuration {
let Some(_counter) = self.counter.take() else {
return;
};
EVICTIONS_WITH_LOW_RESIDENCE_DURATION
.remove_label_values(&[
tenant_id,
timeline_id,
self.data_source,
&Self::threshold_label_value(self.threshold),
])
.expect("we own the metric, no-one else should remove it");
let threshold = Self::threshold_label_value(self.threshold);
let removed = EVICTIONS_WITH_LOW_RESIDENCE_DURATION.remove_label_values(&[
tenant_id,
timeline_id,
self.data_source,
&threshold,
]);
match removed {
Err(e) => {
// this has been hit in staging as
// <https://neondatabase.sentry.io/issues/4142396994/>, but we don't know how.
// because we can be in the drop path already, don't risk:
// - "double-panic => illegal instruction" or
// - future "drop panick => abort"
//
// so just nag: (the error has the labels)
tracing::warn!("failed to remove EvictionsWithLowResidenceDuration, it was already removed? {e:#?}");
}
Ok(()) => {
// to help identify cases where we double-remove the same values, let's log all
// deletions?
tracing::info!("removed EvictionsWithLowResidenceDuration with {tenant_id}, {timeline_id}, {}, {threshold}", self.data_source);
}
}
}
}
@@ -459,6 +478,56 @@ pub static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("Failed to register tenant_task_events metric")
});
// walreceiver metrics
pub static WALRECEIVER_STARTED_CONNECTIONS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_walreceiver_started_connections_total",
"Number of started walreceiver connections"
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_ACTIVE_MANAGERS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_walreceiver_active_managers",
"Number of active walreceiver managers"
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_SWITCHES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_walreceiver_switches_total",
"Number of walreceiver manager change_connection calls",
&["reason"]
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_BROKER_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_walreceiver_broker_updates_total",
"Number of received broker updates in walreceiver"
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_CANDIDATES_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_walreceiver_candidates_events_total",
"Number of walreceiver candidate events",
&["event"]
)
.expect("failed to define a metric")
});
pub static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["add"]));
pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
// Metrics collected on WAL redo operations
//
// We collect the time spent in actual WAL redo ('redo'), and time waiting

View File

@@ -352,7 +352,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: RequestContext,
) -> anyhow::Result<()>
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
@@ -398,7 +398,9 @@ impl PageServerHandler {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
anyhow::bail!("unexpected message: {m:?} during COPY");
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break, // client disconnected
};

View File

@@ -272,6 +272,9 @@ pub enum TaskKind {
#[cfg(test)]
UnitTest,
/// Task which is the only task to delete this particular timeline
DeleteTimeline,
}
#[derive(Default)]

View File

@@ -58,6 +58,8 @@ use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::metadata::load_metadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
use crate::tenant::storage_layer::Layer;
@@ -271,7 +273,10 @@ impl UninitializedTimeline<'_> {
.await
.context("Failed to flush after basebackup import")?;
self.initialize(ctx)
// Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, false, true)
}
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
@@ -451,6 +456,50 @@ pub enum DeleteTimelineError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error, Clone)]
enum InnerDeleteTimelineError {
#[error("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")]
QueueUninitialized,
#[error("failpoint: {0}")]
Failpoint(&'static str),
#[error("failed to remove local timeline directory")]
FailedToRemoveLocalTimelineDirectory,
#[error("index_part.json upload failed")]
UploadFailed,
#[error("the deleted timeline grew branches while deleting it; tenant should now be broken")]
DeletedGrewChildren,
}
impl utils::shared_retryable::Retryable for InnerDeleteTimelineError {
fn is_permanent(&self) -> bool {
use InnerDeleteTimelineError::*;
match self {
QueueUninitialized => false,
Failpoint(_) => false,
FailedToRemoveLocalTimelineDirectory => false,
UploadFailed => false,
DeletedGrewChildren => true,
}
}
}
impl From<InnerDeleteTimelineError> for DeleteTimelineError {
fn from(value: InnerDeleteTimelineError) -> Self {
DeleteTimelineError::Other(anyhow::Error::new(value))
}
}
impl From<utils::shared_retryable::RetriedTaskPanicked> for DeleteTimelineError {
fn from(_: utils::shared_retryable::RetriedTaskPanicked) -> Self {
DeleteTimelineError::Other(anyhow::anyhow!("deleting timeline failed, please retry"))
}
}
struct RemoteStartupData {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
@@ -695,16 +744,9 @@ impl Tenant {
.await
.context("download index file")?;
let remote_metadata = index_part.parse_metadata().context("parse metadata")?;
debug!("finished index part download");
Result::<_, anyhow::Error>::Ok((
timeline_id,
client,
index_part,
remote_metadata,
))
Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
}
.map(move |res| {
res.with_context(|| format!("download index part for timeline {timeline_id}"))
@@ -713,17 +755,26 @@ impl Tenant {
);
}
// Wait for all the download tasks to complete & collect results.
let mut remote_clients = HashMap::new();
let mut index_parts = HashMap::new();
let mut remote_index_and_client = HashMap::new();
let mut timeline_ancestors = HashMap::new();
while let Some(result) = part_downloads.join_next().await {
// NB: we already added timeline_id as context to the error
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
let (timeline_id, client, index_part, remote_metadata) = result?;
let (timeline_id, client, index_part) = result?;
debug!("successfully downloaded index part for timeline {timeline_id}");
timeline_ancestors.insert(timeline_id, remote_metadata);
index_parts.insert(timeline_id, index_part);
remote_clients.insert(timeline_id, client);
match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => {
timeline_ancestors.insert(
timeline_id,
index_part.parse_metadata().context("parse_metadata")?,
);
remote_index_and_client.insert(timeline_id, (index_part, client));
}
MaybeDeletedIndexPart::Deleted => {
info!("timeline {} is deleted, skipping", timeline_id);
continue;
}
}
}
// For every timeline, download the metadata file, scan the local directory,
@@ -731,12 +782,16 @@ impl Tenant {
// layer file.
let sorted_timelines = tree_sort_timelines(timeline_ancestors)?;
for (timeline_id, remote_metadata) in sorted_timelines {
let (index_part, remote_client) = remote_index_and_client
.remove(&timeline_id)
.expect("just put it in above");
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
index_parts.remove(&timeline_id).unwrap(),
index_part,
remote_metadata,
remote_clients.remove(&timeline_id).unwrap(),
remote_client,
&ctx,
)
.await
@@ -1042,21 +1097,13 @@ impl Tenant {
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
#[instrument(skip(self, local_metadata, ctx), fields(timeline_id=%timeline_id))]
#[instrument(skip_all, fields(timeline_id))]
async fn load_local_timeline(
&self,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
Some(ancestor_timeline)
} else {
None
};
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
RemoteTimelineClient::new(
remote_storage.clone(),
@@ -1069,6 +1116,29 @@ impl Tenant {
let remote_startup_data = match &remote_client {
Some(remote_client) => match remote_client.download_index_file().await {
Ok(index_part) => {
let index_part = match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted => {
// TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
// Example:
// start deletion operation
// finishes upload of index part
// pageserver crashes
// remote storage gets de-configured
// pageserver starts
//
// We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
// Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
info!("is_deleted is set on remote, resuming removal of local data originally done by timeline deletion handler");
std::fs::remove_dir_all(
self.conf.timeline_path(&timeline_id, &self.tenant_id),
)
.context("remove_dir_all")?;
return Ok(());
}
};
let remote_metadata = index_part.parse_metadata().context("parse_metadata")?;
Some(RemoteStartupData {
index_part,
@@ -1084,6 +1154,14 @@ impl Tenant {
None => None,
};
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
Some(ancestor_timeline)
} else {
None
};
self.timeline_init_and_sync(
timeline_id,
remote_client,
@@ -1327,10 +1405,12 @@ impl Tenant {
/// Removes timeline-related in-memory data
pub async fn delete_timeline(
&self,
self: &Arc<Tenant>,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
timeline::debug_assert_current_span_has_tenant_and_timeline_id();
// Transition the timeline into TimelineState::Stopping.
// This should prevent new operations from starting.
let timeline = {
@@ -1358,92 +1438,196 @@ impl Tenant {
timeline
};
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
let span = tracing::Span::current();
// if we have concurrent requests, we will only execute one version of following future;
// initially it did not have any means to be cancelled.
//
// NB: If you call delete_timeline multiple times concurrently, they will
// all go through the motions here. Make sure the code here is idempotent,
// and don't error out if some of the shutdown tasks have already been
// completed!
// NOTE: Unlike "the usual" futures, this one will log any errors instead of just propagating
// them to the caller. This is because this one future produces a value, which will need to
// be cloneable to everyone interested, and normal `std::error::Error` are not clonable.
// Also, it wouldn't make sense to log the same failure multiple times, it would look like
// multiple failures to anyone reading the logs.
let factory = || {
let tenant = self.clone();
let tenant_id = self.tenant_id;
let timeline = timeline.clone();
let timeline_id = timeline.timeline_id;
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
async move {
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
//
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await;
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
warn!("failed to stop RemoteTimelineClient due to uninitialized queue");
return Err(InnerDeleteTimelineError::QueueUninitialized);
}
},
}
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// Stop & wait for the remaining timeline tasks, including upload tasks.
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await;
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
// Mark timeline as deleted in S3 so we won't pick it up next time
// during attach or pageserver restart.
// See comment in persist_index_part_with_deleted_flag.
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
warn!("upload failed: {e}");
return Err(InnerDeleteTimelineError::UploadFailed);
}
}
}
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
// XXX make this atomic so that, if we crash-mid-way, the timeline won't be picked up
// with some layers missing.
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
)
})?;
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let _layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
drop(layer_removal_guard);
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by us earlier.
let local_timeline_directory =
tenant.conf.timeline_path(&timeline_id, &tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"failpoint: timeline-delete-before-rm",
))
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = tenant.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
}
Err(e) => {
warn!(
"failed to remove local timeline directory {}: {e:#}",
local_timeline_directory.display()
);
return Err(
InnerDeleteTimelineError::FailedToRemoveLocalTimelineDirectory,
);
}
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"timeline-delete-after-rm",
))
});
// Remove the timeline from the map or poison it if we've grown children.
let removed_timeline =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut timelines = tenant.timelines.lock().unwrap();
let children_exist = timelines.iter().any(|(_, entry)| {
entry.get_ancestor_timeline_id() == Some(timeline_id)
});
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
timelines.remove(&timeline_id)
}));
match removed_timeline {
Ok(Some(_)) => {}
Ok(None) => {
// with SharedRetryable this should no longer happen
warn!("no other task should had dropped the Timeline");
}
Err(_panic) => return Err(InnerDeleteTimelineError::DeletedGrewChildren),
}
Ok(())
}
// execute in the *winner's* span so we will capture the request id etc.
.instrument(span)
};
let (recv, maybe_fut) = timeline.delete_self.try_restart(factory).await;
if let Some(fut) = maybe_fut {
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::DeleteTimeline,
Some(self.tenant_id()),
None,
&format!("delete_timeline {}", timeline.timeline_id),
false,
async move {
fut.await;
Ok(())
},
);
}
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
let removed_timeline = timelines.remove(&timeline_id);
if removed_timeline.is_none() {
// This can legitimately happen if there's a concurrent call to this function.
// T1 T2
// lock
// unlock
// lock
// unlock
// remove files
// lock
// remove from map
// unlock
// return
// remove files
// lock
// remove from map observes empty map
// unlock
// return
debug!("concurrent call to this function won the race");
}
drop(timelines);
Ok(())
recv.await
}
pub fn current_state(&self) -> TenantState {
@@ -2352,6 +2536,8 @@ impl Tenant {
)
})?;
// Initialize the timeline without loading the layer map, because we already updated the layer
// map above, when we imported the datadir.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?

View File

@@ -8,6 +8,8 @@
//! We cannot use global or default config instead, because wrong settings
//! may lead to a data loss.
//!
use anyhow::Context;
use pageserver_api::models::{TenantConfigRequest, TenantCreateRequest};
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::time::Duration;
@@ -280,6 +282,179 @@ impl Default for TenantConf {
}
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
//
// let value = result.with_context(bad_duration("name", &value))?;
//
fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn() -> String {
move || format!("Cannot parse `{field_name}` duration {value:?}")
}
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = &request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = &request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
evictions_low_residence_duration_metric_threshold,
))?,
);
}
Ok(tenant_conf)
}
}
impl TryFrom<&'_ TenantConfigRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantConfigRequest) -> Result<Self, Self::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = &request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
tenant_conf.trace_read_requests = request_data.trace_read_requests;
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = &request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
evictions_low_residence_duration_metric_threshold,
))?,
);
}
Ok(tenant_conf)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -12,6 +12,7 @@ use std::io::Write;
use anyhow::{bail, ensure, Context};
use serde::{Deserialize, Serialize};
use tracing::info_span;
use utils::bin_ser::SerializeError;
use utils::{
bin_ser::BeSer,
id::{TenantId, TimelineId},
@@ -182,7 +183,7 @@ impl TimelineMetadata {
}
}
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
pub fn to_bytes(&self) -> Result<Vec<u8>, SerializeError> {
let body_bytes = self.body.ser()?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {

View File

@@ -204,8 +204,11 @@ mod download;
pub mod index;
mod upload;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
use scopeguard::ScopeGuard;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
@@ -213,7 +216,7 @@ use std::sync::{Arc, Mutex};
use remote_storage::{DownloadError, GenericRemoteStorage};
use std::ops::DerefMut;
use tokio::runtime::Runtime;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use tracing::{info_span, Instrument};
use utils::lsn::Lsn;
@@ -240,6 +243,7 @@ use utils::id::{TenantId, TimelineId};
use self::index::IndexPart;
use super::storage_layer::LayerFileName;
use super::upload_queue::SetDeletedFlagProgress;
// Occasional network issues and such can cause remote operations to fail, and
// that's expected. If a download fails, we log it at info-level, and retry.
@@ -253,6 +257,30 @@ const FAILED_DOWNLOAD_RETRIES: u32 = 10;
// retries. Uploads and deletions are retried forever, though.
const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
pub enum MaybeDeletedIndexPart {
IndexPart(IndexPart),
Deleted,
}
/// Errors that can arise when calling [`RemoteTimelineClient::stop`].
#[derive(Debug, thiserror::Error)]
pub enum StopError {
/// Returned if the upload queue was never initialized.
/// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
#[error("queue is not initialized")]
QueueUninitialized,
}
#[derive(Debug, thiserror::Error)]
pub enum PersistIndexPartWithDeletedFlagError {
#[error("another task is already setting the deleted_flag, started at {0:?}")]
AlreadyInProgress(NaiveDateTime),
#[error("the deleted_flag was already set, value is {0:?}")]
AlreadyDeleted(NaiveDateTime),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
/// A client for accessing a timeline's data in remote storage.
///
/// This takes care of managing the number of connections, and balancing them
@@ -367,7 +395,7 @@ impl RemoteTimelineClient {
//
/// Download index file
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Index,
&RemoteOpKind::Download,
@@ -376,7 +404,7 @@ impl RemoteTimelineClient {
},
);
download::download_index_part(
let index_part = download::download_index_part(
self.conf,
&self.storage_impl,
self.tenant_id,
@@ -389,7 +417,13 @@ impl RemoteTimelineClient {
RemoteOpKind::Download,
Arc::clone(&self.metrics),
)
.await
.await?;
if index_part.deleted_at.is_some() {
Ok(MaybeDeletedIndexPart::Deleted)
} else {
Ok(MaybeDeletedIndexPart::IndexPart(index_part))
}
}
/// Download a (layer) file from `path`, into local filesystem.
@@ -624,6 +658,116 @@ impl RemoteTimelineClient {
Ok(())
}
/// Set the deleted_at field in the remote index file.
///
/// This fails if the upload queue has not been `stop()`ed.
///
/// The caller is responsible for calling `stop()` AND for waiting
/// for any ongoing upload tasks to finish after `stop()` has succeeded.
/// Check method [`RemoteTimelineClient::stop`] for details.
pub(crate) async fn persist_index_part_with_deleted_flag(
self: &Arc<Self>,
) -> Result<(), PersistIndexPartWithDeletedFlagError> {
let index_part_with_deleted_at = {
let mut locked = self.upload_queue.lock().unwrap();
// We must be in stopped state because otherwise
// we can have inprogress index part upload that can overwrite the file
// with missing is_deleted flag that we going to set below
let stopped = match &mut *locked {
UploadQueue::Uninitialized => {
return Err(anyhow::anyhow!("is not Stopped but Uninitialized").into())
}
UploadQueue::Initialized(_) => {
return Err(anyhow::anyhow!("is not Stopped but Initialized").into())
}
UploadQueue::Stopped(stopped) => stopped,
};
match stopped.deleted_at {
SetDeletedFlagProgress::NotRunning => (), // proceed
SetDeletedFlagProgress::InProgress(at) => {
return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
}
SetDeletedFlagProgress::Successful(at) => {
return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
}
};
let deleted_at = Utc::now().naive_utc();
stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
let mut index_part = IndexPart::new(
stopped.latest_files.clone(),
stopped.last_uploaded_consistent_lsn,
stopped
.latest_metadata
.to_bytes()
.context("serialize metadata")?,
);
index_part.deleted_at = Some(deleted_at);
index_part
};
let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
let mut locked = self_clone.upload_queue.lock().unwrap();
let stopped = match &mut *locked {
UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!(
"there's no way out of Stopping, and we checked it's Stopping above: {:?}",
locked.as_str(),
),
UploadQueue::Stopped(stopped) => stopped,
};
stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
});
// Have a failpoint that can use the `pause` failpoint action.
// We don't want to block the executor thread, hence, spawn_blocking + await.
#[cfg(feature = "testing")]
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!(
"at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause"
);
fail::fail_point!(
"persist_index_part_with_deleted_flag_after_set_before_upload_pause"
);
}
})
.await
.expect("spawn_blocking");
upload::upload_index_part(
self.conf,
&self.storage_impl,
self.tenant_id,
self.timeline_id,
&index_part_with_deleted_at,
)
.await?;
// all good, disarm the guard and mark as success
ScopeGuard::into_inner(undo_deleted_at);
{
let mut locked = self.upload_queue.lock().unwrap();
let stopped = match &mut *locked {
UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!(
"there's no way out of Stopping, and we checked it's Stopping above: {:?}",
locked.as_str(),
),
UploadQueue::Stopped(stopped) => stopped,
};
stopped.deleted_at = SetDeletedFlagProgress::Successful(
index_part_with_deleted_at
.deleted_at
.expect("we set it above"),
);
}
Ok(())
}
///
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
@@ -741,8 +885,13 @@ impl RemoteTimelineClient {
// upload finishes or times out soon enough.
if task_mgr::is_shutdown_requested() {
info!("upload task cancelled by shutdown request");
match self.stop() {
Ok(()) => {}
Err(StopError::QueueUninitialized) => {
unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
}
}
self.calls_unfinished_metric_end(&task.op);
self.stop();
return;
}
@@ -946,32 +1095,48 @@ impl RemoteTimelineClient {
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
}
fn stop(&self) {
/// Close the upload queue for new operations and cancel queued operations.
/// In-progress operations will still be running after this function returns.
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
/// to wait for them to complete, after calling this function.
pub fn stop(&self) -> Result<(), StopError> {
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
match &*guard {
UploadQueue::Uninitialized => panic!(
"callers are responsible for ensuring this is only called on initialized queue"
),
match &mut *guard {
UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
Ok(())
}
UploadQueue::Initialized(qi) => {
UploadQueue::Initialized(UploadQueueInitialized {
latest_files,
latest_metadata,
last_uploaded_consistent_lsn,
..
}) => {
info!("shutting down upload queue");
// Replace the queue with the Stopped state, taking ownership of the old
// Initialized queue. We will do some checks on it, and then drop it.
let qi = {
let last_uploaded_consistent_lsn = qi.last_uploaded_consistent_lsn;
let upload_queue = std::mem::replace(
&mut *guard,
UploadQueue::Stopped(UploadQueueStopped {
last_uploaded_consistent_lsn,
}),
);
// take or clone what we need
let latest_files = std::mem::take(latest_files);
let last_uploaded_consistent_lsn = *last_uploaded_consistent_lsn;
// this could be Copy
let latest_metadata = latest_metadata.clone();
let stopped = UploadQueueStopped {
latest_files,
last_uploaded_consistent_lsn,
latest_metadata,
deleted_at: SetDeletedFlagProgress::NotRunning,
};
let upload_queue =
std::mem::replace(&mut *guard, UploadQueue::Stopped(stopped));
if let UploadQueue::Initialized(qi) = upload_queue {
qi
} else {
@@ -979,6 +1144,8 @@ impl RemoteTimelineClient {
}
};
assert!(qi.latest_files.is_empty(), "do not use this anymore");
// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
@@ -1002,6 +1169,7 @@ impl RemoteTimelineClient {
// We're done.
drop(guard);
Ok(())
}
}
}
@@ -1240,7 +1408,11 @@ mod tests {
}
// Download back the index.json, and check that the list of files is correct
let index_part = runtime.block_on(client.download_index_file())?;
let index_part = match runtime.block_on(client.download_index_file())? {
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted => panic!("unexpectedly got deleted index part"),
};
assert_file_list(
&index_part.timeline_layers,
&[

View File

@@ -4,6 +4,7 @@
use std::collections::{HashMap, HashSet};
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
@@ -55,6 +56,10 @@ pub struct IndexPart {
#[serde(default)]
version: usize,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<NaiveDateTime>,
/// Layer names, which are stored on the remote storage.
///
/// Additional metadata can might exist in `layer_metadata`.
@@ -78,7 +83,7 @@ impl IndexPart {
/// used to understand later versions.
///
/// Version is currently informative only.
const LATEST_VERSION: usize = 1;
const LATEST_VERSION: usize = 2;
pub const FILE_NAME: &'static str = "index_part.json";
pub fn new(
@@ -101,6 +106,7 @@ impl IndexPart {
layer_metadata,
disk_consistent_lsn,
metadata_bytes,
deleted_at: None,
}
}
@@ -156,6 +162,7 @@ mod tests {
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
deleted_at: None,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
@@ -192,6 +199,7 @@ mod tests {
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
deleted_at: None,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
@@ -236,6 +244,7 @@ mod tests {
0, 0,
]
.to_vec(),
deleted_at: None,
};
let empty_layers_parsed = serde_json::from_str::<IndexPart>(empty_layers_json).unwrap();

View File

@@ -19,9 +19,12 @@ pub(super) async fn upload_index_part<'a>(
timeline_id: TimelineId,
index_part: &'a IndexPart,
) -> anyhow::Result<()> {
tracing::trace!("uploading new index part");
fail_point!("before-upload-index", |_| {
bail!("failpoint before-upload-index")
});
let index_part_bytes = serde_json::to_vec(&index_part)
.context("Failed to serialize index part file into bytes")?;
let index_part_size = index_part_bytes.len();
@@ -31,6 +34,7 @@ pub(super) async fn upload_index_part<'a>(
.metadata_path(timeline_id, tenant_id)
.with_file_name(IndexPart::FILE_NAME);
let storage_path = conf.remote_path(&index_part_path)?;
storage
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path)
.await

View File

@@ -13,9 +13,9 @@ use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use either::Either;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
@@ -23,8 +23,10 @@ use pageserver_api::models::{
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter;
use utils::rate_limit::RateLimit;
use utils::{
id::{TenantId, TimelineId},
@@ -37,6 +39,8 @@ pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
@@ -158,41 +162,82 @@ impl LayerAccessStatFullDetails {
}
impl LayerAccessStats {
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
new
/// Create an empty stats object.
///
/// The caller is responsible for recording a residence event
/// using [`record_residence_event`] before calling `latest_activity`.
/// If they don't, [`latest_activity`] will return `None`.
pub(crate) fn empty_will_record_residence_event_later() -> Self {
LayerAccessStats(Mutex::default())
}
pub(crate) fn for_new_layer_file() -> Self {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer<L>(
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
) -> Self
where
L: ?Sized + Layer,
{
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
layer_map_lock_held_witness,
status,
LayerResidenceEventReason::LayerLoad,
);
new
}
/// Creates a clone of `self` and records `new_status` in the clone.
/// The `new_status` is not recorded in `self`
pub(crate) fn clone_for_residence_change(
///
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change<L>(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
) -> LayerAccessStats
where
L: ?Sized + Layer,
{
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
};
let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
new.record_residence_event(
layer_map_lock_held_witness,
new_status,
LayerResidenceEventReason::ResidenceChange,
);
new
}
fn record_residence_event(
/// Record a change in layer residency.
///
/// Recording the event must happen while holding the layer map lock to
/// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
/// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
///
/// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
/// the following race could happen:
///
/// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
/// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
///
pub(crate) fn record_residence_event<L>(
&self,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {
) where
L: ?Sized + Layer,
{
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner
@@ -255,24 +300,37 @@ impl LayerAccessStats {
ret
}
fn most_recent_access_or_residence_event(
&self,
) -> Either<LayerAccessStatFullDetails, LayerResidenceEvent> {
/// Get the latest access timestamp, falling back to latest residence event.
///
/// This function can only return `None` if there has not yet been a call to the
/// [`record_residence_event`] method. That would generally be considered an
/// implementation error. This function logs a rate-limited warning in that case.
///
/// TODO: use type system to avoid the need for `fallback`.
/// The approach in https://github.com/neondatabase/neon/pull/3775
/// could be used to enforce that a residence event is recorded
/// before a layer is added to the layer map. We could also have
/// a layer wrapper type that holds the LayerAccessStats, and ensure
/// that that type can only be produced by inserting into the layer map.
pub(crate) fn latest_activity(&self) -> Option<SystemTime> {
let locked = self.0.lock().unwrap();
let inner = &locked.for_eviction_policy;
match inner.last_accesses.recent() {
Some(a) => Either::Left(*a),
Some(a) => Some(a.when),
None => match inner.last_residence_changes.recent() {
Some(e) => Either::Right(e.clone()),
None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"),
}
}
}
pub(crate) fn latest_activity(&self) -> SystemTime {
match self.most_recent_access_or_residence_event() {
Either::Left(mra) => mra.when,
Either::Right(re) => re.timestamp,
Some(e) => Some(e.timestamp),
None => {
static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
guard.0 += 1;
let occurences = guard.0;
guard.1.call(move || {
warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
});
None
}
},
}
}
}

View File

@@ -57,7 +57,7 @@ use utils::{
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, LayerResidenceStatus, PathOrConf,
LayerKeyIter, PathOrConf,
};
///
@@ -637,7 +637,7 @@ impl DeltaLayer {
key_range: summary.key_range,
lsn_range: summary.lsn_range,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
@@ -808,7 +808,7 @@ impl DeltaLayerWriterInner {
key_range: self.key_start..key_end,
lsn_range: self.lsn_range.clone(),
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,

View File

@@ -53,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -438,7 +438,7 @@ impl ImageLayer {
key_range: summary.key_range,
lsn: summary.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(ImageLayerInner {
file: None,
loaded: false,
@@ -598,7 +598,7 @@ impl ImageLayerWriterInner {
key_range: self.key_range.clone(),
lsn: self.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(ImageLayerInner {
loaded: false,
file: None,

View File

@@ -4,6 +4,7 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result};
@@ -246,11 +247,15 @@ impl RemoteLayer {
}
/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer(
pub fn create_downloaded_layer<L>(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer> {
) -> Arc<dyn PersistentLayer>
where
L: ?Sized + Layer,
{
if self.is_delta {
let fname = DeltaFileName {
key_range: self.key_range.clone(),
@@ -262,8 +267,10 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
self.access_stats.clone_for_residence_change(
layer_map_lock_held_witness,
LayerResidenceStatus::Resident,
),
))
} else {
let fname = ImageFileName {
@@ -276,8 +283,10 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
self.access_stats.clone_for_residence_change(
layer_map_lock_held_witness,
LayerResidenceStatus::Resident,
),
))
}
}

View File

@@ -11,7 +11,8 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
TimelineState,
};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
@@ -226,6 +227,9 @@ pub struct Timeline {
state: watch::Sender<TimelineState>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
pub(super) delete_self:
utils::shared_retryable::SharedRetryable<Result<(), super::InnerDeleteTimelineError>>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -588,15 +592,25 @@ impl Timeline {
let _timer = self.metrics.wait_lsn_time_histo.start_timer();
self.last_record_lsn.wait_for_timeout(lsn, self.conf.wait_lsn_timeout).await
.with_context(||
format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}",
lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn()
)
)?;
Ok(())
match self
.last_record_lsn
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.await
{
Ok(()) => Ok(()),
seqwait_error => {
drop(_timer);
let walreceiver_status = self.walreceiver.status().await;
seqwait_error.with_context(|| format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, {}",
lsn,
self.get_last_record_lsn(),
self.get_disk_consistent_lsn(),
walreceiver_status.map(|status| status.to_human_readable_string())
.unwrap_or_else(|| "WalReceiver status: Not active".to_string()),
))
}
}
}
/// Check that it is valid to request operations with that lsn.
@@ -1101,7 +1115,7 @@ impl Timeline {
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::Evicted),
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
@@ -1110,7 +1124,7 @@ impl Timeline {
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::Evicted),
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
});
@@ -1368,6 +1382,8 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_self: utils::shared_retryable::SharedRetryable::default(),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1479,7 +1495,7 @@ impl Timeline {
self.tenant_id,
&imgfilename,
file_size,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
trace!("found layer {}", layer.path().display());
@@ -1511,7 +1527,7 @@ impl Timeline {
self.tenant_id,
&deltafilename,
file_size,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
trace!("found layer {}", layer.path().display());
@@ -1647,7 +1663,10 @@ impl Timeline {
self.timeline_id,
imgfilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
LayerAccessStats::for_loading_layer(
&updates,
LayerResidenceStatus::Evicted,
),
);
let remote_layer = Arc::new(remote_layer);
@@ -1672,7 +1691,10 @@ impl Timeline {
self.timeline_id,
deltafilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
LayerAccessStats::for_loading_layer(
&updates,
LayerResidenceStatus::Evicted,
),
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
@@ -2719,11 +2741,16 @@ impl Timeline {
])?;
// Add it to the layer map
self.layers
.write()
.unwrap()
.batch_update()
.insert_historic(Arc::new(new_delta));
let l = Arc::new(new_delta);
let mut layers = self.layers.write().unwrap();
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(l);
batch_updates.flush();
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
@@ -2928,7 +2955,13 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
updates.insert_historic(Arc::new(l));
let l = Arc::new(l);
l.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(l);
}
updates.flush();
drop(layers);
@@ -3361,6 +3394,11 @@ impl Timeline {
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(x);
}
@@ -3871,9 +3909,9 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size);
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
@@ -4145,7 +4183,15 @@ impl Timeline {
continue;
}
let last_activity_ts = l.access_stats().latest_activity();
let last_activity_ts = l
.access_stats()
.latest_activity()
.unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%l.filename().file_name(), "last_activity returns None, using SystemTime::now");
SystemTime::now()
});
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
layer: l,

View File

@@ -22,7 +22,7 @@ use std::{
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use crate::{
context::{DownloadBehavior, RequestContext},
@@ -184,7 +184,14 @@ impl Timeline {
if hist_layer.is_remote_layer() {
continue;
}
let last_activity_ts = hist_layer.access_stats().latest_activity();
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%hist_layer.filename().file_name(), "last_activity returns None, using SystemTime::now");
SystemTime::now()
});
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,
Err(_e) => {
@@ -269,6 +276,7 @@ impl Timeline {
ControlFlow::Continue(())
}
#[instrument(skip_all)]
async fn imitate_layer_accesses(
&self,
p: &EvictionPolicyLayerAccessThreshold,
@@ -317,6 +325,7 @@ impl Timeline {
}
/// Recompute the values which would cause on-demand downloads during restart.
#[instrument(skip_all)]
async fn imitate_timeline_cached_layer_accesses(
&self,
cancel: &CancellationToken,
@@ -325,7 +334,10 @@ impl Timeline {
let lsn = self.get_last_record_lsn();
// imitiate on-restart initial logical size
let size = self.calculate_logical_size(lsn, cancel.clone(), ctx).await;
let size = self
.calculate_logical_size(lsn, cancel.clone(), ctx)
.instrument(info_span!("calculate_logical_size"))
.await;
match &size {
Ok(_size) => {
@@ -340,7 +352,11 @@ impl Timeline {
}
// imitiate repartiting on first compactation
if let Err(e) = self.collect_keyspace(lsn, ctx).await {
if let Err(e) = self
.collect_keyspace(lsn, ctx)
.instrument(info_span!("collect_keyspace"))
.await
{
// if this failed, we probably failed logical size because these use the same keys
if size.is_err() {
// ignore, see above comment
@@ -353,6 +369,7 @@ impl Timeline {
}
// Imitate the synthetic size calculation done by the consumption_metrics module.
#[instrument(skip_all)]
async fn imitate_synthetic_size_calculation_worker(
&self,
tenant: &Arc<Tenant>,
@@ -391,7 +408,8 @@ impl Timeline {
let mut throwaway_cache = HashMap::new();
let gather =
crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx);
crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx)
.instrument(info_span!("gather_inputs"));
tokio::select! {
_ = cancel.cancelled() => {}

View File

@@ -38,12 +38,14 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::select;
use tokio::sync::watch;
use tokio::sync::{watch, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantTimelineId;
use self::connection_manager::ConnectionManagerStatus;
use super::Timeline;
#[derive(Clone)]
@@ -63,6 +65,7 @@ pub struct WalReceiver {
timeline_ref: Weak<Timeline>,
conf: WalReceiverConf,
started: AtomicBool,
manager_status: Arc<RwLock<Option<ConnectionManagerStatus>>>,
}
impl WalReceiver {
@@ -76,6 +79,7 @@ impl WalReceiver {
timeline_ref,
conf,
started: AtomicBool::new(false),
manager_status: Arc::new(RwLock::new(None)),
}
}
@@ -96,8 +100,8 @@ impl WalReceiver {
let timeline_id = timeline.timeline_id;
let walreceiver_ctx =
ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
let wal_receiver_conf = self.conf.clone();
let loop_status = Arc::clone(&self.manager_status);
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
@@ -115,24 +119,28 @@ impl WalReceiver {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
break;
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut connection_manager_state,
&walreceiver_ctx,
&loop_status,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
break;
}
},
}
}
}.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
connection_manager_state.shutdown().await;
*loop_status.write().await = None;
Ok(())
}
.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
);
self.started.store(true, atomic::Ordering::Release);
@@ -149,6 +157,10 @@ impl WalReceiver {
.await;
self.started.store(false, atomic::Ordering::Release);
}
pub(super) async fn status(&self) -> Option<ConnectionManagerStatus> {
self.manager_status.read().await.clone()
}
}
/// A handle of an asynchronous task.

View File

@@ -13,6 +13,10 @@ use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, ti
use super::{TaskStateUpdate, WalReceiverConf};
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
};
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use anyhow::Context;
@@ -24,6 +28,7 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::BrokerClientChannel;
use storage_broker::Streaming;
use tokio::sync::RwLock;
use tokio::{select, sync::watch};
use tracing::*;
@@ -43,6 +48,7 @@ pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
manager_status: &RwLock<Option<ConnectionManagerStatus>>,
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = connection_manager_state
.timeline
@@ -56,6 +62,11 @@ pub(super) async fn connection_manager_loop_step(
}
}
WALRECEIVER_ACTIVE_MANAGERS.inc();
scopeguard::defer! {
WALRECEIVER_ACTIVE_MANAGERS.dec();
}
let id = TenantTimelineId {
tenant_id: connection_manager_state.timeline.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
@@ -180,6 +191,7 @@ pub(super) async fn connection_manager_loop_step(
.change_connection(new_candidate, ctx)
.await
}
*manager_status.write().await = Some(connection_manager_state.manager_status());
}
}
@@ -267,6 +279,78 @@ pub(super) struct ConnectionManagerState {
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
}
/// An information about connection manager's current connection and connection candidates.
#[derive(Debug, Clone)]
pub struct ConnectionManagerStatus {
existing_connection: Option<WalConnectionStatus>,
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
}
impl ConnectionManagerStatus {
/// Generates a string, describing current connection status in a form, suitable for logging.
pub fn to_human_readable_string(&self) -> String {
let mut resulting_string = "WalReceiver status".to_string();
match &self.existing_connection {
Some(connection) => {
if connection.has_processed_wal {
resulting_string.push_str(&format!(
" (update {}): streaming WAL from node {}, ",
connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
connection.node,
));
match (connection.streaming_lsn, connection.commit_lsn) {
(None, None) => resulting_string.push_str("no streaming data"),
(None, Some(commit_lsn)) => {
resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
}
(Some(streaming_lsn), None) => {
resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
}
(Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
&format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
),
}
} else if connection.is_connected {
resulting_string.push_str(&format!(
" (update {}): connecting to node {}",
connection
.latest_connection_update
.format("%Y-%m-%d %H:%M:%S"),
connection.node,
));
} else {
resulting_string.push_str(&format!(
" (update {}): initializing node {} connection",
connection
.latest_connection_update
.format("%Y-%m-%d %H:%M:%S"),
connection.node,
));
}
}
None => resulting_string.push_str(": disconnected"),
}
resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
let mut candidates = self.wal_stream_candidates.iter().peekable();
while let Some((node_id, candidate_info)) = candidates.next() {
resulting_string.push_str(&format!(
"({}|{}|{})",
node_id,
candidate_info.latest_update.format("%H:%M:%S"),
Lsn(candidate_info.timeline.commit_lsn)
));
if candidates.peek().is_some() {
resulting_string.push_str(", ");
}
}
resulting_string.push(']');
resulting_string
}
}
/// Current connection data.
#[derive(Debug)]
struct WalConnection {
@@ -293,14 +377,14 @@ struct NewCommittedWAL {
discovered_at: NaiveDateTime,
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
struct RetryInfo {
next_retry_at: Option<NaiveDateTime>,
retry_duration_seconds: f64,
}
/// Data about the timeline to connect to, received from the broker.
#[derive(Debug)]
#[derive(Debug, Clone)]
struct BrokerSkTimeline {
timeline: SafekeeperTimelineInfo,
/// Time at which the data was fetched from the broker last time, to track the stale data.
@@ -325,9 +409,14 @@ impl ConnectionManagerState {
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
WALRECEIVER_SWITCHES
.with_label_values(&[new_sk.reason.name()])
.inc();
self.drop_old_connection(true).await;
let id = self.id;
let node_id = new_sk.safekeeper_id;
let connect_timeout = self.conf.wal_connect_timeout;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
@@ -343,12 +432,13 @@ impl ConnectionManagerState {
cancellation,
connect_timeout,
ctx,
node_id,
)
.await
.context("walreceiver connection handling failure")
}
.instrument(
info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, node_id = %new_sk.safekeeper_id),
info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, %node_id),
)
});
@@ -364,6 +454,7 @@ impl ConnectionManagerState {
latest_wal_update: now,
streaming_lsn: None,
commit_lsn: None,
node: node_id,
},
connection_task: connection_handle,
discovered_new_wal: None,
@@ -437,6 +528,8 @@ impl ConnectionManagerState {
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
WALRECEIVER_BROKER_UPDATES.inc();
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
let old_entry = self.wal_stream_candidates.insert(
new_safekeeper_id,
@@ -448,6 +541,7 @@ impl ConnectionManagerState {
if old_entry.is_none() {
info!("New SK node was added: {new_safekeeper_id}");
WALRECEIVER_CANDIDATES_ADDED.inc();
}
}
@@ -716,6 +810,7 @@ impl ConnectionManagerState {
for node_id in node_ids_to_remove {
info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
self.wal_connection_retries.remove(&node_id);
WALRECEIVER_CANDIDATES_REMOVED.inc();
}
}
}
@@ -725,6 +820,13 @@ impl ConnectionManagerState {
wal_connection.connection_task.shutdown().await;
}
}
fn manager_status(&self) -> ConnectionManagerStatus {
ConnectionManagerStatus {
existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
wal_stream_candidates: self.wal_stream_candidates.clone(),
}
}
}
#[derive(Debug)]
@@ -732,8 +834,6 @@ struct NewWalConnectionCandidate {
safekeeper_id: NodeId,
wal_source_connconf: PgConnectionConfig,
availability_zone: Option<String>,
// This field is used in `derive(Debug)` only.
#[allow(dead_code)]
reason: ReconnectReason,
}
@@ -762,6 +862,18 @@ enum ReconnectReason {
},
}
impl ReconnectReason {
fn name(&self) -> &str {
match self {
ReconnectReason::NoExistingConnection => "NoExistingConnection",
ReconnectReason::LaggingWal { .. } => "LaggingWal",
ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
}
}
}
fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
@@ -867,6 +979,7 @@ mod tests {
latest_wal_update: now,
commit_lsn: Some(Lsn(current_lsn)),
streaming_lsn: Some(Lsn(current_lsn)),
node: NodeId(1),
};
state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
@@ -1035,6 +1148,7 @@ mod tests {
latest_wal_update: now,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
node: connected_sk_id,
};
state.wal_connection = Some(WalConnection {
@@ -1101,6 +1215,7 @@ mod tests {
latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
node: NodeId(1),
};
state.wal_connection = Some(WalConnection {
@@ -1164,6 +1279,7 @@ mod tests {
latest_wal_update: time_over_threshold,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
node: NodeId(1),
};
state.wal_connection = Some(WalConnection {
@@ -1261,6 +1377,7 @@ mod tests {
latest_wal_update: now,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
node: connected_sk_id,
};
state.wal_connection = Some(WalConnection {

View File

@@ -24,8 +24,8 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use super::TaskStateUpdate;
use crate::context::RequestContext;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::{context::RequestContext, metrics::WALRECEIVER_STARTED_CONNECTIONS};
use crate::{
task_mgr,
task_mgr::TaskKind,
@@ -37,8 +37,8 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::NodeId, lsn::Lsn};
/// Status of the connection.
#[derive(Debug, Clone, Copy)]
@@ -56,6 +56,8 @@ pub(super) struct WalConnectionStatus {
pub streaming_lsn: Option<Lsn>,
/// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
pub commit_lsn: Option<Lsn>,
/// The node it is connected to
pub node: NodeId,
}
/// Open a connection to the given safekeeper and receive WAL, sending back progress
@@ -67,7 +69,10 @@ pub(super) async fn handle_walreceiver_connection(
cancellation: CancellationToken,
connect_timeout: Duration,
ctx: RequestContext,
node: NodeId,
) -> anyhow::Result<()> {
WALRECEIVER_STARTED_CONNECTIONS.inc();
// Connect to the database in replication mode.
info!("connecting to {wal_source_connconf:?}");
@@ -100,6 +105,7 @@ pub(super) async fn handle_walreceiver_connection(
latest_wal_update: Utc::now().naive_utc(),
streaming_lsn: None,
commit_lsn: None,
node,
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
@@ -122,7 +128,7 @@ pub(super) async fn handle_walreceiver_connection(
false,
async move {
select! {
connection_result = connection => match connection_result{
connection_result = connection => match connection_result {
Ok(()) => info!("Walreceiver db connection closed"),
Err(connection_error) => {
if let Err(e) = ignore_expected_errors(connection_error) {

View File

@@ -7,6 +7,7 @@ use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use chrono::NaiveDateTime;
use std::sync::Arc;
use tracing::info;
@@ -18,14 +19,14 @@ use utils::lsn::Lsn;
// that many upload queues in a running pageserver, and most of them are initialized
// anyway.
#[allow(clippy::large_enum_variant)]
pub(crate) enum UploadQueue {
pub(super) enum UploadQueue {
Uninitialized,
Initialized(UploadQueueInitialized),
Stopped(UploadQueueStopped),
}
impl UploadQueue {
fn as_str(&self) -> &'static str {
pub fn as_str(&self) -> &'static str {
match self {
UploadQueue::Uninitialized => "Uninitialized",
UploadQueue::Initialized(_) => "Initialized",
@@ -75,8 +76,18 @@ pub(crate) struct UploadQueueInitialized {
pub(crate) queued_operations: VecDeque<UploadOp>,
}
pub(crate) struct UploadQueueStopped {
pub(crate) last_uploaded_consistent_lsn: Lsn,
#[derive(Clone, Copy)]
pub(super) enum SetDeletedFlagProgress {
NotRunning,
InProgress(NaiveDateTime),
Successful(NaiveDateTime),
}
pub(super) struct UploadQueueStopped {
pub(super) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
pub(super) last_uploaded_consistent_lsn: Lsn,
pub(super) latest_metadata: TimelineMetadata,
pub(super) deleted_at: SetDeletedFlagProgress,
}
impl UploadQueue {

View File

@@ -96,6 +96,8 @@ static shmem_request_hook_type prev_shmem_request_hook;
#endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
void FileCacheMonitorMain(Datum main_arg);
static void
lfc_shmem_startup(void)
{
@@ -378,7 +380,6 @@ lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry* entry;
ssize_t rc;
bool found;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;

10
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.4.1 and should not be changed by hand.
# This file is automatically @generated by Poetry and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -968,14 +968,14 @@ testing = ["pre-commit"]
[[package]]
name = "flask"
version = "2.1.3"
version = "2.2.5"
description = "A simple framework for building complex web applications."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "Flask-2.1.3-py3-none-any.whl", hash = "sha256:9013281a7402ad527f8fd56375164f3aa021ecfaff89bfe3825346c24f87e04c"},
{file = "Flask-2.1.3.tar.gz", hash = "sha256:15972e5017df0575c3d6c090ba168b6db90259e620ac8d7ea813a396bad5b6cb"},
{file = "Flask-2.2.5-py3-none-any.whl", hash = "sha256:58107ed83443e86067e41eff4631b058178191a355886f8e479e347fa1285fdf"},
{file = "Flask-2.2.5.tar.gz", hash = "sha256:edee9b0a7ff26621bd5a8c10ff484ae28737a2410d99b0bb9a6850c7fb977aa0"},
]
[package.dependencies]
@@ -983,7 +983,7 @@ click = ">=8.0"
importlib-metadata = {version = ">=3.6.0", markers = "python_version < \"3.10\""}
itsdangerous = ">=2.0"
Jinja2 = ">=3.0"
Werkzeug = ">=2.0"
Werkzeug = ">=2.2.2"
[package.extras]
async = ["asgiref (>=3.2)"]

View File

@@ -25,6 +25,7 @@ parking_lot.workspace = true
postgres.workspace = true
postgres-protocol.workspace = true
regex.workspace = true
scopeguard.workspace = true
reqwest = { workspace = true, features = ["json"] }
serde.workspace = true
serde_json.workspace = true

View File

@@ -108,6 +108,9 @@ struct Args {
/// available to the system.
#[arg(long)]
wal_backup_threads: Option<usize>,
/// Number of max parallel WAL segments to be offloaded to remote storage.
#[arg(long, default_value = "5")]
wal_backup_parallel_jobs: usize,
/// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring
/// WAL backup horizon.
#[arg(long)]
@@ -182,6 +185,7 @@ fn main() -> anyhow::Result<()> {
max_offloader_lag_bytes: args.max_offloader_lag,
backup_runtime_threads: args.wal_backup_threads,
wal_backup_enabled: !args.disable_wal_backup,
backup_parallel_jobs: args.wal_backup_parallel_jobs,
auth,
};

View File

@@ -14,10 +14,13 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::Request;
use std::time::Duration;
use std::time::Instant;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use crate::metrics::BROKER_PULLED_UPDATES;
use crate::metrics::BROKER_PUSHED_UPDATES;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -49,12 +52,17 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let now = Instant::now();
let mut active_tlis = GlobalTimelines::get_all();
active_tlis.retain(|tli| tli.is_active());
for tli in &active_tlis {
let sk_info = tli.get_safekeeper_info(&conf);
yield sk_info;
BROKER_PUSHED_UPDATES.inc();
}
let elapsed = now.elapsed();
// Log duration every second. Should be about 10MB of logs per day.
info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
sleep(push_interval).await;
}
};
@@ -79,6 +87,10 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
.context("subscribe_safekeper_info request failed")?
.into_inner();
let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
while let Some(msg) = stream.message().await? {
let proto_ttid = msg
.tenant_timeline_id
@@ -91,7 +103,15 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
// connection to the broker.
// note: there are blocking operations below, but it's considered fine for now
tli.record_safekeeper_info(msg).await?
let res = tli.record_safekeeper_info(msg).await;
if res.is_ok() {
ok_counter.inc();
} else {
err_counter.inc();
}
res?;
} else {
not_found.inc();
}
}
bail!("end of stream");

View File

@@ -10,7 +10,7 @@ use tracing::{info, info_span, Instrument};
use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::metrics::TrafficMetrics;
use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED};
use crate::wal_service::ConnectionId;
use crate::{GlobalTimelines, SafeKeeperConf};
use postgres_backend::QueryError;
@@ -72,6 +72,15 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
}
}
fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
match cmd {
SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL",
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
for SafekeeperPostgresHandler
@@ -168,6 +177,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
}
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
PG_QUERIES_RECEIVED.with_label_values(&[cmd_str]).inc();
scopeguard::defer! {
PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc();
}
info!(
"got query {:?} in timeline {:?}",

View File

@@ -61,6 +61,7 @@ pub struct SafeKeeperConf {
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
pub backup_runtime_threads: Option<usize>,
pub backup_parallel_jobs: usize,
pub wal_backup_enabled: bool,
pub auth: Option<Arc<JwtAuth>>,
}
@@ -93,6 +94,7 @@ impl SafeKeeperConf {
broker_keepalive_interval: Duration::from_secs(5),
backup_runtime_threads: None,
wal_backup_enabled: true,
backup_parallel_jobs: 1,
auth: None,
heartbeat_timeout: Duration::new(5, 0),
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,

View File

@@ -10,7 +10,7 @@ use anyhow::Result;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
register_int_counter_vec, Gauge, IntCounterVec, IntGaugeVec,
register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;
@@ -73,6 +73,58 @@ pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_pg_io_bytes gauge")
});
pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_broker_pushed_updates_total",
"Number of timeline updates pushed to the broker"
)
.expect("Failed to register safekeeper_broker_pushed_updates_total counter")
});
pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_broker_pulled_updates_total",
"Number of timeline updates pulled and processed from the broker",
&["result"]
)
.expect("Failed to register safekeeper_broker_pulled_updates_total counter")
});
pub static PG_QUERIES_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_pg_queries_received_total",
"Number of queries received through pg protocol",
&["query"]
)
.expect("Failed to register safekeeper_pg_queries_received_total counter")
});
pub static PG_QUERIES_FINISHED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_pg_queries_finished_total",
"Number of queries finished through pg protocol",
&["query"]
)
.expect("Failed to register safekeeper_pg_queries_finished_total counter")
});
pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_removed_wal_segments_total",
"Number of WAL segments removed from the disk"
)
.expect("Failed to register safekeeper_removed_wal_segments_total counter")
});
pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_backed_up_segments_total",
"Number of WAL segments backed up to the broker"
)
.expect("Failed to register safekeeper_backed_up_segments_total counter")
});
pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_backup_errors_total",
"Number of errors during backup"
)
.expect("Failed to register safekeeper_backup_errors_total counter")
});
pub const LABEL_UNKNOWN: &str = "unknown";

View File

@@ -384,6 +384,8 @@ impl SafekeeperPostgresHandler {
self.appname.clone(),
));
let commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx();
// Walproposer gets special handling: safekeeper must give proposer all
// local WAL till the end, whether committed or not (walproposer will
// hang otherwise). That's because walproposer runs the consensus and
@@ -399,7 +401,14 @@ impl SafekeeperPostgresHandler {
} else {
None
};
let end_pos = stop_pos.unwrap_or(Lsn::INVALID);
// take the latest commit_lsn if don't have stop_pos
let mut end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow());
if end_pos < start_pos {
warn!("start_pos {} is ahead of end_pos {}", start_pos, end_pos);
end_pos = start_pos;
}
info!(
"starting streaming from {:?} till {:?}",
@@ -429,7 +438,7 @@ impl SafekeeperPostgresHandler {
start_pos,
end_pos,
stop_pos,
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
commit_lsn_watch_rx,
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
@@ -456,6 +465,11 @@ struct WalSender<'a, IO> {
// Position since which we are sending next chunk.
start_pos: Lsn,
// WAL up to this position is known to be locally available.
// Usually this is the same as the latest commit_lsn, but in case of
// walproposer recovery, this is flush_lsn.
//
// We send this LSN to the receiver as wal_end, so that it knows how much
// WAL this safekeeper has. This LSN should be as fresh as possible.
end_pos: Lsn,
// If present, terminate after reaching this position; used by walproposer
// in recovery.
@@ -530,10 +544,18 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// exit in the meanwhile
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
self.end_pos = *self.commit_lsn_watch_rx.borrow();
if self.end_pos > self.start_pos {
// We have something to send.
return Ok(());
}
// Wait for WAL to appear, now self.end_pos == self.start_pos.
if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? {
self.end_pos = lsn;
return Ok(());
}
// Timed out waiting for WAL, check for termination and send KA
if let Some(remote_consistent_lsn) = self
.ws_guard
@@ -551,7 +573,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
self.pgb
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: self.end_pos.0,
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
@@ -618,11 +640,6 @@ const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// - Ok(None) if timeout expired;
/// - Err in case of error (if watch channel is in trouble, shouldn't happen).
async fn wait_for_lsn(rx: &mut Receiver<Lsn>, lsn: Lsn) -> anyhow::Result<Option<Lsn>> {
let commit_lsn: Lsn = *rx.borrow();
if commit_lsn > lsn {
return Ok(Some(commit_lsn));
}
let res = timeout(POLL_STATE_TIMEOUT, async move {
let mut commit_lsn;
loop {

View File

@@ -1,5 +1,7 @@
use anyhow::{Context, Result};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use tokio::task::JoinHandle;
use utils::id::NodeId;
@@ -25,6 +27,7 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
use crate::timeline::{PeerInfo, Timeline};
use crate::{GlobalTimelines, SafeKeeperConf};
@@ -154,8 +157,14 @@ async fn update_task(
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, conf.workdir.clone(), shutdown_rx)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
backup_task_main(
ttid,
timeline_dir,
conf.workdir.clone(),
conf.backup_parallel_jobs,
shutdown_rx,
)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
entry.handle = Some(WalBackupTaskHandle {
@@ -239,6 +248,7 @@ struct WalBackupTask {
timeline_dir: PathBuf,
workspace_dir: PathBuf,
wal_seg_size: usize,
parallel_jobs: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
}
@@ -247,6 +257,7 @@ async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: PathBuf,
workspace_dir: PathBuf,
parallel_jobs: usize,
mut shutdown_rx: Receiver<()>,
) {
info!("started");
@@ -263,6 +274,7 @@ async fn backup_task_main(
timeline: tli,
timeline_dir,
workspace_dir,
parallel_jobs,
};
// task is spinned up only when wal_seg_size already initialized
@@ -329,6 +341,7 @@ impl WalBackupTask {
self.wal_seg_size,
&self.timeline_dir,
&self.workspace_dir,
self.parallel_jobs,
)
.await
{
@@ -355,20 +368,49 @@ pub async fn backup_lsn_range(
wal_seg_size: usize,
timeline_dir: &Path,
workspace_dir: &Path,
parallel_jobs: usize,
) -> Result<()> {
if parallel_jobs < 1 {
anyhow::bail!("parallel_jobs must be >= 1");
}
let start_lsn = *backup_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
for s in &segments {
backup_single_segment(s, timeline_dir, workspace_dir)
.await
.with_context(|| format!("offloading segno {}", s.seg_no))?;
let new_backup_lsn = s.end_lsn;
timeline
.set_wal_backup_lsn(new_backup_lsn)
.context("setting wal_backup_lsn")?;
*backup_lsn = new_backup_lsn;
// Pool of concurrent upload tasks. We use `FuturesOrdered` to
// preserve order of uploads, and update `backup_lsn` only after
// all previous uploads are finished.
let mut uploads = FuturesOrdered::new();
let mut iter = segments.iter();
loop {
let added_task = match iter.next() {
Some(s) => {
uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
true
}
None => false,
};
// Wait for the next segment to upload if we don't have any more segments,
// or if we have too many concurrent uploads.
if !added_task || uploads.len() >= parallel_jobs {
let next = uploads.next().await;
if let Some(res) = next {
// next segment uploaded
let segment = res?;
let new_backup_lsn = segment.end_lsn;
timeline
.set_wal_backup_lsn(new_backup_lsn)
.context("setting wal_backup_lsn")?;
*backup_lsn = new_backup_lsn;
} else {
// no more segments to upload
break;
}
}
}
info!(
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
@@ -382,7 +424,7 @@ async fn backup_single_segment(
seg: &Segment,
timeline_dir: &Path,
workspace_dir: &Path,
) -> Result<()> {
) -> Result<Segment> {
let segment_file_path = seg.file_path(timeline_dir)?;
let remote_segment_path = segment_file_path
.strip_prefix(workspace_dir)
@@ -394,10 +436,16 @@ async fn backup_single_segment(
)
})?;
backup_object(&segment_file_path, &remote_segment_path, seg.size()).await?;
let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
if res.is_ok() {
BACKED_UP_SEGMENTS.inc();
} else {
BACKUP_ERRORS.inc();
}
res?;
debug!("Backup of {} done", segment_file_path.display());
Ok(())
Ok(*seg)
}
#[derive(Debug, Copy, Clone)]

Some files were not shown because too many files have changed in this diff Show More