Compare commits

..

235 Commits

Author SHA1 Message Date
Shany Pozin
b05fe53cfd Merge pull request #6240 from neondatabase/releases/2024-01-01
Release 2024-01-01
2024-01-01 11:07:30 +02:00
Christian Schwarz
c13a2f0df1 Merge pull request #6192 from neondatabase/releases/2023-12-19
Release 2023-12-19

We need to do a config change that requires restarting the pageservers.
Slip in two metrics-related commits that didn't make this week's regularly release.
2023-12-19 14:52:47 +01:00
Christian Schwarz
39be366fc5 higher resolution histograms for getpage@lsn (#6177)
part of https://github.com/neondatabase/cloud/issues/7811
2023-12-19 13:46:59 +00:00
Christian Schwarz
6eda0a3158 [PRE-MERGE] fix metric pageserver_initial_logical_size_start_calculation
(This is a pre-merge cherry-pick of https://github.com/neondatabase/neon/pull/6191)

It wasn't being incremented.

Fixup of

    commit 1c88824ed0
    Author: Christian Schwarz <christian@neon.tech>
    Date:   Fri Dec 1 12:52:59 2023 +0100

        initial logical size calculation: add a bunch of metrics (#5995)
2023-12-19 13:46:55 +00:00
Shany Pozin
306c7a1813 Merge pull request #6173 from neondatabase/sasha_release_bypassrls_replication
Grant BYPASSRLS and REPLICATION explicitly to neon_superuser roles
2023-12-18 22:16:36 +02:00
Sasha Krassovsky
80be423a58 Grant BYPASSRLS and REPLICATION explicitly to neon_superuser roles 2023-12-18 10:22:36 -08:00
Shany Pozin
5dcfef82f2 Merge pull request #6163 from neondatabase/releases/2023-12-18
Release 2023-12-18-2
2023-12-18 15:34:17 +02:00
Christian Schwarz
e67b8f69c0 [PRE-MERGE] pageserver: Reduce tracing overhead in timeline::get #6115
Pre-merge `git merge --squash` of
https://github.com/neondatabase/neon/pull/6115

Lowering the tracing level in get_value_reconstruct_data and
get_or_maybe_download from info to debug reduces the overhead
of span creation in non-debug environments.
2023-12-18 13:39:48 +01:00
Shany Pozin
e546872ab4 Merge pull request #6158 from neondatabase/releases/2023-12-18
Release 2023-12-18
2023-12-18 14:24:34 +02:00
John Spray
322ea1cf7c pageserver: on-demand activation cleanups (#6157)
## Problem

#6112 added some logs and metrics: clean these up a bit:
- Avoid counting startup completions for tenants launched after startup
- exclude no-op cases from timing histograms 
- remove a rogue log messages
2023-12-18 11:14:19 +00:00
Vadim Kharitonov
3633742de9 Merge pull request #6121 from neondatabase/releases/2023-12-13
Release 2023-12-13
2023-12-13 12:39:43 +01:00
Joonas Koivunen
079d3a37ba Merge remote-tracking branch 'origin/release' into releases/2023-12-13
this handles the hotfix introduced conflict.
2023-12-13 10:07:19 +00:00
Vadim Kharitonov
a46e77b476 Merge pull request #6090 from neondatabase/releases/2023-12-11
Release 2023-12-11
2023-12-12 12:10:35 +01:00
Tristan Partin
a92702b01e Add submodule paths as safe directories as a precaution
The check-codestyle-rust-arm job requires this for some reason, so let's
just add them everywhere we do this workaround.
2023-12-11 22:00:35 +00:00
Tristan Partin
8ff3253f20 Fix git ownership issue in check-codestyle-rust-arm
We have this workaround for other jobs. Looks like this one was
forgotten about.
2023-12-11 22:00:35 +00:00
Joonas Koivunen
04b82c92a7 fix: accidential return Ok (#6106)
Error indicating request cancellation OR timeline shutdown was deemed as
a reason to exit the background worker that calculated synthetic size.
Fix it to only be considered for avoiding logging such of such errors.

This conflicted on tenant_shard_id having already replaced tenant_id on
`main`.
2023-12-11 21:41:36 +00:00
Vadim Kharitonov
e5bf423e68 Merge branch 'release' into releases/2023-12-11 2023-12-11 11:55:48 +01:00
Vadim Kharitonov
60af392e45 Merge pull request #6057 from neondatabase/vk/patch_timescale_for_production
Revert timescaledb for pg14 and pg15 (#6056)
2023-12-06 16:21:16 +01:00
Vadim Kharitonov
661fc41e71 Revert timescaledb for pg14 and pg15 (#6056)
```
could not start the compute node: compute is in state "failed": db error: ERROR: could not access file "$libdir/timescaledb-2.10.1": No such file or directory Caused by: ERROR: could not access file "$libdir/timescaledb-2.10.1": No such file or directory
```
2023-12-06 16:14:07 +01:00
Shany Pozin
702c488f32 Merge pull request #6022 from neondatabase/releases/2023-12-04
Release 2023-12-04
2023-12-05 17:03:28 +02:00
Sasha Krassovsky
45c5122754 Remove trusted from wal2json 2023-12-04 12:36:19 -08:00
Shany Pozin
558394f710 fix merge 2023-12-04 11:41:27 +02:00
Shany Pozin
73b0898608 Merge branch 'release' into releases/2023-12-04 2023-12-04 11:36:26 +02:00
Joonas Koivunen
e65be4c2dc Merge pull request #6013 from neondatabase/releases/2023-12-01-hotfix
fix: use create_new instead of create for mutex file
2023-12-01 15:35:56 +02:00
Joonas Koivunen
40087b8164 fix: use create_new instead of create for mutex file 2023-12-01 12:54:49 +00:00
Shany Pozin
c762b59483 Merge pull request #5986 from neondatabase/Release-11-30-hotfix
Notify safekeeper readiness with systemd.
2023-11-30 10:01:05 +02:00
Arseny Sher
5d71601ca9 Notify safekeeper readiness with systemd.
To avoid downtime during deploy, as in busy regions initial load can currently
take ~30s.
2023-11-30 08:23:31 +03:00
Shany Pozin
a113c3e433 Merge pull request #5945 from neondatabase/release-2023-11-28-hotfix
Release 2023 11 28 hotfix
2023-11-28 08:14:59 +02:00
Anastasia Lubennikova
e81fc598f4 Update neon extension relocatable for existing installations (#5943) 2023-11-28 00:12:39 +00:00
Anastasia Lubennikova
48b845fa76 Make neon extension relocatable to allow SET SCHEMA (#5942) 2023-11-28 00:12:32 +00:00
Shany Pozin
27096858dc Merge pull request #5922 from neondatabase/releases/2023-11-27
Release 2023-11-27
2023-11-27 09:58:51 +02:00
Shany Pozin
4430d0ae7d Merge pull request #5876 from neondatabase/releases/2023-11-17
Release 2023-11-17
2023-11-20 09:11:58 +02:00
Joonas Koivunen
6e183aa0de Merge branch 'main' into releases/2023-11-17 2023-11-19 15:25:47 +00:00
Vadim Kharitonov
fd6d0b7635 Merge branch 'release' into releases/2023-11-17 2023-11-17 10:51:45 +01:00
Vadim Kharitonov
3710c32aae Merge pull request #5778 from neondatabase/releases/2023-11-03
Release 2023-11-03
2023-11-03 16:06:58 +01:00
Vadim Kharitonov
be83bee49d Merge branch 'release' into releases/2023-11-03 2023-11-03 11:18:15 +01:00
Alexander Bayandin
cf28e5922a Merge pull request #5685 from neondatabase/releases/2023-10-26
Release 2023-10-26
2023-10-27 10:42:12 +01:00
Em Sharnoff
7d384d6953 Bump vm-builder v0.18.2 -> v0.18.4 (#5666)
Only applicable change was neondatabase/autoscaling#584, setting
pgbouncer auth_dbname=postgres in order to fix superuser connections
from preventing dropping databases.
2023-10-26 20:15:45 +01:00
Em Sharnoff
4b3b37b912 Bump vm-builder v0.18.1 -> v0.18.2 (#5646)
Only applicable change was neondatabase/autoscaling#571, removing the
postgres_exporter flags `--auto-discover-databases` and
`--exclude-databases=...`
2023-10-26 20:15:29 +01:00
Shany Pozin
1d8d200f4d Merge pull request #5668 from neondatabase/sp/aux_files_cherry_pick
Cherry pick: Ignore missed AUX_FILES_KEY when generating image layer (#5660)
2023-10-26 10:08:16 +03:00
Konstantin Knizhnik
0d80d6ce18 Ignore missed AUX_FILES_KEY when generating image layer (#5660)
## Problem

Logical replication requires new AUX_FILES_KEY which is definitely
absent in existed database.
We do not have function to check if key exists in our KV storage.
So I have to handle the error in `list_aux_files` method.
But this key is also included in key space range and accessed y
`create_image_layer` method.

## Summary of changes

Check if AUX_FILES_KEY  exists before including it in keyspace.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Shany Pozin <shany@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2023-10-26 09:30:28 +03:00
Shany Pozin
f653ee039f Merge pull request #5638 from neondatabase/releases/2023-10-24
Release 2023-10-24
2023-10-24 12:10:52 +03:00
Em Sharnoff
e614a95853 Merge pull request #5610 from neondatabase/sharnoff/rc-2023-10-20-vm-monitor-fixes
Release 2023-10-20: vm-monitor memory.high throttling fixes
2023-10-20 00:11:06 -07:00
Em Sharnoff
850db4cc13 vm-monitor: Deny not fail downscale if no memory stats yet (#5606)
Fixes an issue we observed on staging that happens when the
autoscaler-agent attempts to immediately downscale the VM after binding,
which is typical for pooled computes.

The issue was occurring because the autoscaler-agent was requesting
downscaling before the vm-monitor had gathered sufficient cgroup memory
stats to be confident in approving it. When the vm-monitor returned an
internal error instead of denying downscaling, the autoscaler-agent
retried the connection and immediately hit the same issue (in part
because cgroup stats are collected per-connection, rather than
globally).
2023-10-19 21:56:55 -07:00
Em Sharnoff
8a316b1277 vm-monitor: Log full error on message handling failure (#5604)
There's currently an issue with the vm-monitor on staging that's not
really feasible to debug because the current display impl gives no
context to the errors (just says "failed to downscale").

Logging the full error should help.

For communications with the autoscaler-agent, it's ok to only provide
the outermost cause, because we can cross-reference with the VM logs.
At some point in the future, we may want to change that.
2023-10-19 21:56:50 -07:00
Em Sharnoff
4d13bae449 vm-monitor: Switch from memory.high to polling memory.stat (#5524)
tl;dr it's really hard to avoid throttling from memory.high, and it
counts tmpfs & page cache usage, so it's also hard to make sense of.

In the interest of fixing things quickly with something that should be
*good enough*, this PR switches to instead periodically fetch memory
statistics from the cgroup's memory.stat and use that data to determine
if and when we should upscale.

This PR fixes #5444, which has a lot more detail on the difficulties
we've hit with memory.high. This PR also supersedes #5488.
2023-10-19 21:56:36 -07:00
Vadim Kharitonov
49377abd98 Merge pull request #5577 from neondatabase/releases/2023-10-17
Release 2023-10-17
2023-10-17 12:21:20 +02:00
Christian Schwarz
a6b2f4e54e limit imitate accesses concurrency, using same semaphore as compactions (#5578)
Before this PR, when we restarted pageserver, we'd see a rush of
`$number_of_tenants` concurrent eviction tasks starting to do imitate
accesses building up in the period of `[init_order allows activations,
$random_access_delay + EvictionPolicyLayerAccessThreshold::period]`.

We simply cannot handle that degree of concurrent IO.

We already solved the problem for compactions by adding a semaphore.
So, this PR shares that semaphore for use by evictions.

Part of https://github.com/neondatabase/neon/issues/5479

Which is again part of https://github.com/neondatabase/neon/issues/4743

Risks / Changes In System Behavior
==================================

* we don't do evictions as timely as we currently do
* we log a bunch of warnings about eviction taking too long
* imitate accesses and compactions compete for the same concurrency
limit, so, they'll slow each other down through this shares semaphore

Changes
=======

- Move the `CONCURRENT_COMPACTIONS` semaphore into `tasks.rs`
- Rename it to `CONCURRENT_BACKGROUND_TASKS`
- Use it also for the eviction imitate accesses:
    - Imitate acceses are both per-TIMELINE and per-TENANT
    - The per-TENANT is done through coalescing all the per-TIMELINE
      tasks via a tokio mutex `eviction_task_tenant_state`.
    - We acquire the CONCURRENT_BACKGROUND_TASKS permit early, at the
      beginning of the eviction iteration, much before the imitate
      acesses start (and they may not even start at all in the given
      iteration, as they happen only every $threshold).
    - Acquiring early is **sub-optimal** because when the per-timline
      tasks coalesce on the `eviction_task_tenant_state` mutex,
      they are already holding a CONCURRENT_BACKGROUND_TASKS permit.
    - It's also unfair because tenants with many timelines win
      the CONCURRENT_BACKGROUND_TASKS more often.
    - I don't think there's another way though, without refactoring
      more of the imitate accesses logic, e.g, making it all per-tenant.
- Add metrics for queue depth behind the semaphore.
I found these very useful to understand what work is queued in the
system.

    - The metrics are tagged by the new `BackgroundLoopKind`.
    - On a green slate, I would have used `TaskKind`, but we already had
      pre-existing labels whose names didn't map exactly to task kind.
      Also the task kind is kind of a lower-level detail, so, I think
it's fine to have a separate enum to identify background work kinds.

Future Work
===========

I guess I could move the eviction tasks from a ticker to "sleep for
$period".
The benefit would be that the semaphore automatically "smears" the
eviction task scheduling over time, so, we only have the rush on restart
but a smeared-out rush afterward.

The downside is that this perverts the meaning of "$period", as we'd
actually not run the eviction at a fixed period. It also means the the
"took to long" warning & metric becomes meaningless.

Then again, that is already the case for the compaction and gc tasks,
which do sleep for `$period` instead of using a ticker.

(cherry picked from commit 9256788273)
2023-10-17 12:16:26 +02:00
Shany Pozin
face60d50b Merge pull request #5526 from neondatabase/releases/2023-10-11
Release 2023-10-11
2023-10-11 11:16:39 +03:00
Shany Pozin
9768aa27f2 Merge pull request #5516 from neondatabase/releases/2023-10-10
Release 2023-10-10
2023-10-10 14:16:47 +03:00
Shany Pozin
96b2e575e1 Merge pull request #5445 from neondatabase/releases/2023-10-03
Release 2023-10-03
2023-10-04 13:53:37 +03:00
Alexander Bayandin
7222777784 Update checksums for pg_jsonschema & pg_graphql (#5455)
## Problem

Folks have re-taged releases for `pg_jsonschema` and `pg_graphql` (to
increase timeouts on their CI), for us, these are a noop changes, 
but unfortunately, this will cause our builds to fail due to checksums 
mismatch (this might not strike right away because of the build cache).
- 8ba7c7be9d
- aa7509370a

## Summary of changes
- `pg_jsonschema` update checksum
- `pg_graphql` update checksum
2023-10-03 18:44:30 +01:00
Em Sharnoff
5469fdede0 Merge pull request #5422 from neondatabase/sharnoff/rc-2023-09-28-fix-restart-on-postmaster-SIGKILL
Release 2023-09-28: Fix (lack of) restart on neonvm postmaster SIGKILL
2023-09-28 10:48:51 -07:00
MMeent
72aa6b9fdd Fix neon_zeroextend's WAL logging (#5387)
When you log more than a few blocks, you need to reserve the space in
advance. We didn't do that, so we got errors. Now we do that, and
shouldn't get errors.
2023-09-28 09:37:28 -07:00
Em Sharnoff
ae0634b7be Bump vm-builder v0.17.11 -> v0.17.12 (#5407)
Only relevant change is neondatabase/autoscaling#534 - refer there for
more details.
2023-09-28 09:28:04 -07:00
Shany Pozin
70711f32fa Merge pull request #5375 from neondatabase/releases/2023-09-26
Release 2023-09-26
2023-09-26 15:19:45 +03:00
Vadim Kharitonov
52a88af0aa Merge pull request #5336 from neondatabase/releases/2023-09-19
Release 2023-09-19
2023-09-19 11:16:43 +02:00
Alexander Bayandin
b7a43bf817 Merge branch 'release' into releases/2023-09-19 2023-09-19 09:07:20 +01:00
Alexander Bayandin
dce91b33a4 Merge pull request #5318 from neondatabase/releases/2023-09-15-1
Postgres 14/15: Use previous extensions versions
2023-09-15 16:30:44 +01:00
Alexander Bayandin
23ee4f3050 Revert plv8 only 2023-09-15 15:45:23 +01:00
Alexander Bayandin
46857e8282 Postgres 14/15: Use previous extensions versions 2023-09-15 15:27:00 +01:00
Alexander Bayandin
368ab0ce54 Merge pull request #5313 from neondatabase/releases/2023-09-15
Release 2023-09-15
2023-09-15 10:39:56 +01:00
Konstantin Knizhnik
a5987eebfd References to old and new blocks were mixed in xlog_heap_update handler (#5312)
## Problem

See https://neondb.slack.com/archives/C05L7D1JAUS/p1694614585955029

https://www.notion.so/neondatabase/Duplicate-key-issue-651627ce843c45188fbdcb2d30fd2178

## Summary of changes

Swap old/new block references

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-09-15 10:11:41 +01:00
Alexander Bayandin
6686ede30f Update checksum for pg_hint_plan (#5309)
## Problem

The checksum for `pg_hint_plan` doesn't match:
```
sha256sum: WARNING: 1 computed checksum did NOT match
```

Ref
https://github.com/neondatabase/neon/actions/runs/6185715461/job/16793609251?pr=5307

It seems that the release was retagged yesterday:
https://github.com/ossc-db/pg_hint_plan/releases/tag/REL16_1_6_0

I don't see any malicious changes from 15_1.5.1:
https://github.com/ossc-db/pg_hint_plan/compare/REL15_1_5_1...REL16_1_6_0,
so it should be ok to update.

## Summary of changes
- Update checksum for `pg_hint_plan` 16_1.6.0
2023-09-15 09:54:42 +01:00
Em Sharnoff
373c7057cc vm-monitor: Fix cgroup throttling (#5303)
I believe this (not actual IO problems) is the cause of the "disk speed
issue" that we've had for VMs recently. See e.g.:

1. https://neondb.slack.com/archives/C03H1K0PGKH/p1694287808046179?thread_ts=1694271790.580099&cid=C03H1K0PGKH
2. https://neondb.slack.com/archives/C03H1K0PGKH/p1694511932560659

The vm-informant (and now, the vm-monitor, its replacement) is supposed
to gradually increase the `neon-postgres` cgroup's memory.high value,
because otherwise the kernel will throttle all the processes in the
cgroup.

This PR fixes a bug with the vm-monitor's implementation of this
behavior.

---

Other references, for the vm-informant's implementation:

- Original issue: neondatabase/autoscaling#44
- Original PR: neondatabase/autoscaling#223
2023-09-15 09:54:42 +01:00
Shany Pozin
7d6ec16166 Merge pull request #5296 from neondatabase/releases/2023-09-13
Release 2023-09-13
2023-09-13 13:49:14 +03:00
Shany Pozin
0e6fdc8a58 Merge pull request #5283 from neondatabase/releases/2023-09-12
Release 2023-09-12
2023-09-12 14:56:47 +03:00
Christian Schwarz
521438a5c6 fix deadlock around TENANTS (#5285)
The sequence that can lead to a deadlock:

1. DELETE request gets all the way to `tenant.shutdown(progress,
false).await.is_err() ` , while holding TENANTS.read()
2. POST request for tenant creation comes in, calls `tenant_map_insert`,
it does `let mut guard = TENANTS.write().await;`
3. Something that `tenant.shutdown()` needs to wait for needs a
`TENANTS.read().await`.
The only case identified in exhaustive manual scanning of the code base
is this one:
Imitate size access does `get_tenant().await`, which does
`TENANTS.read().await` under the hood.

In the above case (1) waits for (3), (3)'s read-lock request is queued
behind (2)'s write-lock, and (2) waits for (1).
Deadlock.

I made a reproducer/proof-that-above-hypothesis-holds in
https://github.com/neondatabase/neon/pull/5281 , but, it's not ready for
merge yet and we want the fix _now_.

fixes https://github.com/neondatabase/neon/issues/5284
2023-09-12 14:13:13 +03:00
Vadim Kharitonov
07d7874bc8 Merge pull request #5202 from neondatabase/releases/2023-09-05
Release 2023-09-05
2023-09-05 12:16:06 +02:00
Anastasia Lubennikova
1804111a02 Merge pull request #5161 from neondatabase/rc-2023-08-31
Release 2023-08-31
2023-08-31 16:53:17 +03:00
Arthur Petukhovsky
cd0178efed Merge pull request #5150 from neondatabase/release-sk-fix-active-timeline
Release 2023-08-30
2023-08-30 11:43:39 +02:00
Shany Pozin
333574be57 Merge pull request #5133 from neondatabase/releases/2023-08-29
Release 2023-08-29
2023-08-29 14:02:58 +03:00
Alexander Bayandin
79a799a143 Merge branch 'release' into releases/2023-08-29 2023-08-29 11:17:57 +01:00
Conrad Ludgate
9da06af6c9 Merge pull request #5113 from neondatabase/release-http-connection-fix
Release 2023-08-25
2023-08-25 17:21:35 +01:00
Conrad Ludgate
ce1753d036 proxy: dont return connection pending (#5107)
## Problem

We were returning Pending when a connection had a notice/notification
(introduced recently in #5020). When returning pending, the runtime
assumes you will call `cx.waker().wake()` in order to continue
processing.

We weren't doing that, so the connection task would get stuck

## Summary of changes

Don't return pending. Loop instead
2023-08-25 16:42:30 +01:00
Alek Westover
67db8432b4 Fix cargo deny errors (#5068)
## Problem
cargo deny lint broken

Links to the CVEs:

[rustsec.org/advisories/RUSTSEC-2023-0052](https://rustsec.org/advisories/RUSTSEC-2023-0052)

[rustsec.org/advisories/RUSTSEC-2023-0053](https://rustsec.org/advisories/RUSTSEC-2023-0053)
One is fixed, the other one isn't so we allow it (for now), to unbreak
CI. Then later we'll try to get rid of webpki in favour of the rustls
fork.

## Summary of changes
```
+ignore = ["RUSTSEC-2023-0052"]
```
2023-08-25 16:42:30 +01:00
Vadim Kharitonov
4e2e44e524 Enable neon-pool-opt-in (#5062) 2023-08-22 09:06:14 +01:00
Vadim Kharitonov
ed786104f3 Merge pull request #5060 from neondatabase/releases/2023-08-22
Release 2023-08-22
2023-08-22 09:41:02 +02:00
Stas Kelvich
84b74f2bd1 Merge pull request #4997 from neondatabase/sk/proxy-release-23-07-15
Fix lint
2023-08-15 18:54:20 +03:00
Arthur Petukhovsky
fec2ad6283 Fix lint 2023-08-15 18:49:02 +03:00
Stas Kelvich
98eebd4682 Merge pull request #4996 from neondatabase/sk/proxy_release
Disable neon-pool-opt-in
2023-08-15 18:37:50 +03:00
Arthur Petukhovsky
2f74287c9b Disable neon-pool-opt-in 2023-08-15 18:34:17 +03:00
Shany Pozin
aee1bf95e3 Merge pull request #4990 from neondatabase/releases/2023-08-15
Release 2023-08-15
2023-08-15 15:34:38 +03:00
Shany Pozin
b9de9d75ff Merge branch 'release' into releases/2023-08-15 2023-08-15 14:35:00 +03:00
Stas Kelvich
7943b709e6 Merge pull request #4940 from neondatabase/sk/release-23-05-25-proxy-fixup
Release: proxy retry fixup
2023-08-09 13:53:19 +03:00
Conrad Ludgate
d7d066d493 proxy: delay auth on retry (#4929)
## Problem

When an endpoint is shutting down, it can take a few seconds. Currently
when starting a new compute, this causes an "endpoint is in transition"
error. We need to add delays before retrying to ensure that we allow
time for the endpoint to shutdown properly.

## Summary of changes

Adds a delay before retrying in auth. connect_to_compute already has
this delay
2023-08-09 12:54:24 +03:00
Felix Prasanna
e78ac22107 release fix: revert vm builder bump from 0.13.1 -> 0.15.0-alpha1 (#4932)
This reverts commit 682dfb3a31.

hotfix for a CLI arg issue in the monitor
2023-08-08 21:08:46 +03:00
Vadim Kharitonov
76a8f2bb44 Merge pull request #4923 from neondatabase/releases/2023-08-08
Release 2023-08-08
2023-08-08 11:44:38 +02:00
Vadim Kharitonov
8d59a8581f Merge branch 'release' into releases/2023-08-08 2023-08-08 10:54:34 +02:00
Vadim Kharitonov
b1ddd01289 Define NEON_SMGR to make it possible for extensions to use Neon SMG API (#4889)
Co-authored-by: Konstantin Knizhnik <knizhnik@garret.ru>
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-08-03 16:28:31 +03:00
Alexander Bayandin
6eae4fc9aa Release 2023-08-02: update pg_embedding (#4877)
Cherry-picking ca4d71a954 from `main` into
the `release`

Co-authored-by: Vadim Kharitonov <vadim2404@users.noreply.github.com>
2023-08-03 08:48:09 +02:00
Christian Schwarz
765455bca2 Merge pull request #4861 from neondatabase/releases/2023-08-01--2-fix-pipeline
ci: fix upload-postgres-extensions-to-s3 job
2023-08-01 13:22:07 +02:00
Christian Schwarz
4204960942 ci: fix upload-postgres-extensions-to-s3 job
commit

	commit 5f8fd640bf
	Author: Alek Westover <alek.westover@gmail.com>
	Date:   Wed Jul 26 08:24:03 2023 -0400

	    Upload Test Remote Extensions (#4792)

switched to using the release tag instead of `latest`, but,
the `promote-images` job only uploads `latest` to the prod ECR.

The switch to using release tag was good in principle, but,
reverting that part to make the release pipeine work.

Note that a proper fix should abandon use of `:latest` tag
at all: currently, if a `main` pipeline runs concurrently
with a `release` pipeline, the `release` pipeline may end
up using the `main` pipeline's images.
2023-08-01 12:01:45 +02:00
Christian Schwarz
67345d66ea Merge pull request #4858 from neondatabase/releases/2023-08-01
Release 2023-08-01
2023-08-01 10:44:01 +02:00
Shany Pozin
2266ee5971 Merge pull request #4803 from neondatabase/releases/2023-07-25
Release 2023-07-25
2023-07-25 14:21:07 +03:00
Shany Pozin
b58445d855 Merge pull request #4746 from neondatabase/releases/2023-07-18
Release 2023-07-18
2023-07-18 14:45:39 +03:00
Conrad Ludgate
36050e7f3d Merge branch 'release' into releases/2023-07-18 2023-07-18 12:00:09 +01:00
Alexander Bayandin
33360ed96d Merge pull request #4705 from neondatabase/release-2023-07-12
Release 2023-07-12 (only proxy)
2023-07-12 19:44:36 +01:00
Conrad Ludgate
39a28d1108 proxy wake_compute loop (#4675)
## Problem

If we fail to wake up the compute node, a subsequent connect attempt
will definitely fail. However, kubernetes won't fail the connection
immediately, instead it hangs until we timeout (10s).

## Summary of changes

Refactor the loop to allow fast retries of compute_wake and to skip a
connect attempt.
2023-07-12 18:40:11 +01:00
Conrad Ludgate
efa6aa134f allow repeated IO errors from compute node (#4624)
## Problem

#4598 compute nodes are not accessible some time after wake up due to
kubernetes DNS not being fully propagated.

## Summary of changes

Update connect retry mechanism to support handling IO errors and
sleeping for 100ms

## Checklist before requesting a review

- [x] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
2023-07-12 18:40:06 +01:00
Alexander Bayandin
2c724e56e2 Merge pull request #4646 from neondatabase/releases/2023-07-06-hotfix
Release 2023-07-06 (add pg_embedding extension only)
2023-07-06 12:19:52 +01:00
Alexander Bayandin
feff887c6f Compile pg_embedding extension (#4634)
```
CREATE EXTENSION embedding;
CREATE TABLE t (val real[]);
INSERT INTO t (val) VALUES ('{0,0,0}'), ('{1,2,3}'), ('{1,1,1}'), (NULL);
CREATE INDEX ON t USING hnsw (val) WITH (maxelements = 10, dims=3, m=3);
INSERT INTO t (val) VALUES (array[1,2,4]);

SELECT * FROM t ORDER BY val <-> array[3,3,3];
   val   
---------
 {1,2,3}
 {1,2,4}
 {1,1,1}
 {0,0,0}
 
(5 rows)
```
2023-07-06 09:39:41 +01:00
Vadim Kharitonov
353d915fcf Merge pull request #4633 from neondatabase/releases/2023-07-05
Release 2023-07-05
2023-07-05 15:10:47 +02:00
Vadim Kharitonov
2e38098cbc Merge branch 'release' into releases/2023-07-05 2023-07-05 12:41:48 +02:00
Vadim Kharitonov
a6fe5ea1ac Merge pull request #4571 from neondatabase/releases/2023-06-27
Release 2023-06-27
2023-06-27 12:55:33 +02:00
Vadim Kharitonov
05b0aed0c1 Merge branch 'release' into releases/2023-06-27 2023-06-27 12:22:12 +02:00
Alex Chi Z
cd1705357d Merge pull request #4561 from neondatabase/releases/2023-06-23-hotfix
Release 2023-06-23 (pageserver-only)
2023-06-23 15:38:50 -04:00
Christian Schwarz
6bc7561290 don't use MGMT_REQUEST_RUNTIME for consumption metrics synthetic size worker
The consumption metrics synthetic size worker does logical size calculation.
Logical size calculation currently does synchronous disk IO.
This blocks the MGMT_REQUEST_RUNTIME's executor threads, starving other futures.

While there's work on the way to move the synchronous disk IO into spawn_blocking,
the quickfix here is to use the BACKGROUND_RUNTIME instead of MGMT_REQUEST_RUNTIME.

Actually it's not just a quickfix. We simply shouldn't be blocking MGMT_REQUEST_RUNTIME
executor threads on CPU or sync disk IO.
That work isn't done yet, as many of the mgmt tasks still _do_ disk IO.
But it's not as intensive as the logical size calculations that we're fixing here.

While we're at it, fix disk-usage-based eviction in a similar way.
It wasn't the culprit here, according to prod logs, but it can theoretically be
a little CPU-intensive.

More context, including graphs from Prod:
https://neondb.slack.com/archives/C03F5SM1N02/p1687541681336949

(cherry picked from commit d6e35222ea)
2023-06-23 20:54:07 +02:00
Christian Schwarz
fbd3ac14b5 Merge pull request #4544 from neondatabase/releases/2023-06-21-hotfix
Release 2023-06-21 (fixup for post-merge failed 2023-06-20)
2023-06-21 16:54:34 +03:00
Christian Schwarz
e437787c8f cargo update -p openssl (#4542)
To unblock release
https://github.com/neondatabase/neon/pull/4536#issuecomment-1600678054

Context: https://rustsec.org/advisories/RUSTSEC-2023-0044
2023-06-21 15:52:56 +03:00
Christian Schwarz
3460dbf90b Merge pull request #4536 from neondatabase/releases/2023-06-20
Release 2023-06-20 (actually 2023-06-21)
2023-06-21 14:19:14 +03:00
Vadim Kharitonov
6b89d99677 Merge pull request #4521 from neondatabase/release_2023-06-15
Release 2023 06 15
2023-06-15 17:40:01 +02:00
Vadim Kharitonov
6cc8ea86e4 Merge branch 'main' into release_2023-06-15 2023-06-15 16:50:44 +02:00
Shany Pozin
e62a492d6f Merge pull request #4486 from neondatabase/releases/2023-06-13
Release 2023-06-13
2023-06-13 15:21:35 +03:00
Alexey Kondratov
a475cdf642 [compute_ctl] Fix logging if catalog updates are skipped (#4480)
Otherwise, it wasn't clear from the log when Postgres started up
completely if catalog updates were skipped.

Follow-up for 4936ab6
2023-06-13 13:37:24 +02:00
Stas Kelvich
7002c79a47 Merge pull request #4447 from neondatabase/release_proxy_08-06-2023
Release proxy 08 06 2023
2023-06-08 21:02:54 +03:00
Vadim Kharitonov
ee6cf357b4 Merge pull request #4427 from neondatabase/releases/2023-06-06
Release 2023-06-06
2023-06-06 14:42:21 +02:00
Vadim Kharitonov
e5c2086b5f Merge branch 'release' into releases/2023-06-06 2023-06-06 12:33:56 +02:00
Shany Pozin
5f1208296a Merge pull request #4395 from neondatabase/releases/2023-06-01
Release 2023-06-01
2023-06-01 10:58:00 +03:00
Stas Kelvich
88e8e473cd Merge pull request #4345 from neondatabase/release-23-05-25-proxy
Release 23-05-25, take 3
2023-05-25 19:40:43 +03:00
Stas Kelvich
b0a77844f6 Add SQL-over-HTTP endpoint to Proxy
This commit introduces an SQL-over-HTTP endpoint in the proxy, with a JSON
response structure resembling that of the node-postgres driver. This method,
using HTTP POST, achieves smaller amortized latencies in edge setups due to
fewer round trips and an enhanced open connection reuse by the v8 engine.

This update involves several intricacies:
1. SQL injection protection: We employed the extended query protocol, modifying
   the rust-postgres driver to send queries in one roundtrip using a text
   protocol rather than binary, bypassing potential issues like those identified
   in https://github.com/sfackler/rust-postgres/issues/1030.

2. Postgres type compatibility: As not all postgres types have binary
   representations (e.g., acl's in pg_class), we adjusted rust-postgres to
   respond with text protocol, simplifying serialization and fixing queries with
   text-only types in response.

3. Data type conversion: Considering JSON supports fewer data types than
   Postgres, we perform conversions where possible, passing all other types as
   strings. Key conversions include:
   - postgres int2, int4, float4, float8 -> json number (NaN and Inf remain
     text)
   - postgres bool, null, text -> json bool, null, string
   - postgres array -> json array
   - postgres json and jsonb -> json object

4. Alignment with node-postgres: To facilitate integration with js libraries,
   we've matched the response structure of node-postgres, returning command tags
   and column oids. Command tag capturing was added to the rust-postgres
   functionality as part of this change.
2023-05-25 17:59:17 +03:00
Vadim Kharitonov
1baf464307 Merge pull request #4309 from neondatabase/releases/2023-05-23
Release 2023-05-23
2023-05-24 11:56:54 +02:00
Alexander Bayandin
e9b8e81cea Merge branch 'release' into releases/2023-05-23 2023-05-23 12:54:08 +01:00
Alexander Bayandin
85d6194aa4 Fix regress-tests job for Postgres 15 on release branch (#4254)
## Problem

Compatibility tests don't support Postgres 15 yet, but we're still
trying to upload compatibility snapshot (which we do not collect).

Ref
https://github.com/neondatabase/neon/actions/runs/4991394158/jobs/8940369368#step:4:38129

## Summary of changes

Add `pg_version` parameter to `run-python-test-set` actions and do not
upload compatibility snapshot for Postgres 15
2023-05-16 17:19:12 +01:00
Vadim Kharitonov
333a7a68ef Merge pull request #4245 from neondatabase/releases/2023-05-16
Release 2023-05-16
2023-05-16 13:38:40 +02:00
Vadim Kharitonov
6aa4e41bee Merge branch 'release' into releases/2023-05-16 2023-05-16 12:48:23 +02:00
Joonas Koivunen
840183e51f try: higher page_service timeouts to isolate an issue 2023-05-11 16:24:53 +03:00
Shany Pozin
cbccc94b03 Merge pull request #4184 from neondatabase/releases/2023-05-09
Release 2023-05-09
2023-05-09 15:30:36 +03:00
Stas Kelvich
fce227df22 Merge pull request #4163 from neondatabase/main
Release 23-05-05
2023-05-05 15:56:23 +03:00
Stas Kelvich
bd787e800f Merge pull request #4133 from neondatabase/main
Release 23-04-01
2023-05-01 18:52:46 +03:00
Shany Pozin
4a7704b4a3 Merge pull request #4131 from neondatabase/sp/hotfix_adding_sks_us_west
Hotfix: Adding 4 new pageservers and two sets of safekeepers to us west 2
2023-05-01 15:17:38 +03:00
Shany Pozin
ff1119da66 Add 2 new sets of safekeepers to us-west2 2023-05-01 14:35:31 +03:00
Shany Pozin
4c3ba1627b Add 4 new Pageservers for retool launch 2023-05-01 14:34:38 +03:00
Vadim Kharitonov
1407174fb2 Merge pull request #4110 from neondatabase/vk/release_2023-04-28
Release 2023 04 28
2023-04-28 17:43:16 +02:00
Vadim Kharitonov
ec9dcb1889 Merge branch 'release' into vk/release_2023-04-28 2023-04-28 16:32:26 +02:00
Joonas Koivunen
d11d781afc revert: "Add check for duplicates of generated image layers" (#4104)
This reverts commit 732acc5.

Reverted PR: #3869

As noted in PR #4094, we do in fact try to insert duplicates to the
layer map, if L0->L1 compaction is interrupted. We do not have a proper
fix for that right now, and we are in a hurry to make a release to
production, so revert the changes related to this to the state that we
have in production currently. We know that we have a bug here, but
better to live with the bug that we've had in production for a long
time, than rush a fix to production without testing it in staging first.

Cc: #4094, #4088
2023-04-28 16:31:35 +02:00
Anastasia Lubennikova
4e44565b71 Merge pull request #4000 from neondatabase/releases/2023-04-11
Release 2023-04-11
2023-04-11 17:47:41 +03:00
Stas Kelvich
4ed51ad33b Add more proxy cnames 2023-04-11 15:59:35 +03:00
Arseny Sher
1c1ebe5537 Merge pull request #3946 from neondatabase/releases/2023-04-04
Release 2023-04-04
2023-04-04 14:38:40 +04:00
Christian Schwarz
c19cb7f386 Merge pull request #3935 from neondatabase/releases/2023-04-03
Release 2023-04-03
2023-04-03 16:19:49 +02:00
Vadim Kharitonov
4b97d31b16 Merge pull request #3896 from neondatabase/releases/2023-03-28
Release 2023-03-28
2023-03-28 17:58:06 +04:00
Shany Pozin
923ade3dd7 Merge pull request #3855 from neondatabase/releases/2023-03-21
Release 2023-03-21
2023-03-21 13:12:32 +02:00
Arseny Sher
b04e711975 Merge pull request #3825 from neondatabase/release-2023-03-15
Release 2023.03.15
2023-03-15 15:38:00 +03:00
Arseny Sher
afd0a6b39a Forward framed read buf contents to compute before proxy pass.
Otherwise they get lost. Normally buffer is empty before proxy pass, but this is
not the case with pipeline mode of out npm driver; fixes connection hangup
introduced by b80fe41af3 for it.

fixes https://github.com/neondatabase/neon/issues/3822
2023-03-15 15:36:06 +04:00
Lassi Pölönen
99752286d8 Use RollingUpdate strategy also for legacy proxy (#3814)
## Describe your changes
We have previously changed the neon-proxy to use RollingUpdate. This
should be enabled in legacy proxy too in order to avoid breaking
connections for the clients and allow for example backups to run even
during deployment. (https://github.com/neondatabase/neon/pull/3683)

## Issue ticket number and link
https://github.com/neondatabase/neon/issues/3333
2023-03-15 15:35:51 +04:00
Arseny Sher
15df93363c Merge pull request #3804 from neondatabase/release-2023-03-13
Release 2023.03.13
2023-03-13 20:25:40 +03:00
Vadim Kharitonov
bc0ab741af Merge pull request #3758 from neondatabase/releases/2023-03-07
Release 2023-03-07
2023-03-07 12:38:47 +01:00
Christian Schwarz
51d9dfeaa3 Merge pull request #3743 from neondatabase/releases/2023-03-03
Release 2023-03-03
2023-03-03 19:20:21 +01:00
Shany Pozin
f63cb18155 Merge pull request #3713 from neondatabase/releases/2023-02-28
Release 2023-02-28
2023-02-28 12:52:24 +02:00
Arseny Sher
0de603d88e Merge pull request #3707 from neondatabase/release-2023-02-24
Release 2023-02-24

Hotfix for UNLOGGED tables. Contains #3706
Also contains rebase on 14.7 and 15.2 #3581
2023-02-25 00:32:11 +04:00
Heikki Linnakangas
240913912a Fix UNLOGGED tables.
Instead of trying to create missing files on the way, send init fork contents as
main fork from pageserver during basebackup. Add test for that. Call
put_rel_drop for init forks; previously they weren't removed. Bump
vendor/postgres to revert previous approach on Postgres side.

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>

ref https://github.com/neondatabase/postgres/pull/264
ref https://github.com/neondatabase/postgres/pull/259
ref https://github.com/neondatabase/neon/issues/1222
2023-02-24 23:54:53 +04:00
MMeent
91a4ea0de2 Update vendored PostgreSQL versions to 14.7 and 15.2 (#3581)
## Describe your changes
Rebase vendored PostgreSQL onto 14.7 and 15.2

## Issue ticket number and link

#3579

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [x] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [x] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
    ```
The version of PostgreSQL that we use is updated to 14.7 for PostgreSQL
14 and 15.2 for PostgreSQL 15.
    ```
2023-02-24 23:54:42 +04:00
Arseny Sher
8608704f49 Merge pull request #3691 from neondatabase/release-2023-02-23
Release 2023-02-23

Hotfix for the unlogged tables with indexes issue.

neondatabase/postgres#259
neondatabase/postgres#262
2023-02-23 13:39:33 +04:00
Arseny Sher
efef68ce99 Bump vendor/postgres to include hotfix for unlogged tables with indexes.
https://github.com/neondatabase/postgres/pull/259
https://github.com/neondatabase/postgres/pull/262
2023-02-23 08:49:43 +04:00
Joonas Koivunen
8daefd24da Merge pull request #3679 from neondatabase/releases/2023-02-22
Releases/2023-02-22
2023-02-22 15:56:55 +02:00
Arthur Petukhovsky
46cc8b7982 Remove safekeeper-1.ap-southeast-1.aws.neon.tech (#3671)
We migrated all timelines to
`safekeeper-3.ap-southeast-1.aws.neon.tech`, now old instance can be
removed.
2023-02-22 15:07:57 +02:00
Sergey Melnikov
38cd90dd0c Add -v to ansible invocations (#3670)
To get more debug output on failures
2023-02-22 15:07:57 +02:00
Joonas Koivunen
a51b269f15 fix: hold permit until GetObject eof (#3663)
previously we applied the ratelimiting only up to receiving the headers
from s3, or somewhere near it. the commit adds an adapter which carries
the permit until the AsyncRead has been disposed.

fixes #3662.
2023-02-22 15:07:57 +02:00
Joonas Koivunen
43bf6d0a0f calculate_logical_size: no longer use spawn_blocking (#3664)
Calculation of logical size is now async because of layer downloads, so
we shouldn't use spawn_blocking for it. Use of `spawn_blocking`
exhausted resources which are needed by `tokio::io::copy` when copying
from a stream to a file which lead to deadlock.

Fixes: #3657
2023-02-22 15:07:57 +02:00
Joonas Koivunen
15273a9b66 chore: ignore all compaction inactive tenant errors (#3665)
these are happening in tests because of #3655 but they sure took some
time to appear.

makes the `Compaction failed, retrying in 2s: Cannot run compaction
iteration on inactive tenant` into a globally allowed error, because it
has been seen failing on different test cases.
2023-02-22 15:07:57 +02:00
Joonas Koivunen
78aca668d0 fix: log download failed error (#3661)
Fixes #3659
2023-02-22 15:07:57 +02:00
Vadim Kharitonov
acbf4148ea Merge pull request #3656 from neondatabase/releases/2023-02-21
Release 2023-02-21
2023-02-21 16:03:48 +01:00
Vadim Kharitonov
6508540561 Merge branch 'release' into releases/2023-02-21 2023-02-21 15:31:16 +01:00
Arthur Petukhovsky
a41b5244a8 Add new safekeeper to ap-southeast-1 prod (#3645) (#3646)
To trigger deployment of #3645 to production.
2023-02-20 15:22:49 +00:00
Shany Pozin
2b3189be95 Merge pull request #3600 from neondatabase/releases/2023-02-14
Release 2023-02-14
2023-02-15 13:31:30 +02:00
Vadim Kharitonov
248563c595 Merge pull request #3553 from neondatabase/releases/2023-02-07
Release 2023-02-07
2023-02-07 14:07:44 +01:00
Vadim Kharitonov
14cd6ca933 Merge branch 'release' into releases/2023-02-07 2023-02-07 12:11:56 +01:00
Vadim Kharitonov
eb36403e71 Release 2023 01 31 (#3497)
Co-authored-by: Kirill Bulatov <kirill@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
Co-authored-by: bojanserafimov <bojan.serafimov7@gmail.com>
Co-authored-by: Christian Schwarz <christian@neon.tech>
Co-authored-by: Alexey Kondratov <kondratov.aleksey@gmail.com>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Konstantin Knizhnik <knizhnik@garret.ru>
Co-authored-by: Shany Pozin <shany@neon.tech>
Co-authored-by: Sergey Melnikov <sergey@neon.tech>
Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
Co-authored-by: Rory de Zoete <33318916+zoete@users.noreply.github.com>
Co-authored-by: Rory de Zoete <rdezoete@Rorys-Mac-Studio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
Co-authored-by: Lassi Pölönen <lassi.polonen@iki.fi>
2023-01-31 15:06:35 +02:00
Anastasia Lubennikova
3c6f779698 Merge pull request #3411 from neondatabase/release_2023_01_23
Fix Release 2023 01 23
2023-01-23 20:10:03 +02:00
Joonas Koivunen
f67f0c1c11 More tenant size fixes (#3410)
Small changes, but hopefully this will help with the panic detected in
staging, for which we cannot get the debugging information right now
(end-of-branch before branch-point).
2023-01-23 17:46:13 +02:00
Shany Pozin
edb02d3299 Adding pageserver3 to staging (#3403) 2023-01-23 17:46:13 +02:00
Konstantin Knizhnik
664a69e65b Fix slru_segment_key_range function: segno was assigned to incorrect Key field (#3354) 2023-01-23 17:46:13 +02:00
Anastasia Lubennikova
478322ebf9 Fix tenant size orphans (#3377)
Before only the timelines which have passed the `gc_horizon` were
processed which failed with orphans at the tree_sort phase. Example
input in added `test_branched_empty_timeline_size` test case.

The PR changes iteration to happen through all timelines, and in
addition to that, any learned branch points will be calculated as they
would had been in the original implementation if the ancestor branch had
been over the `gc_horizon`.

This also changes how tenants where all timelines are below `gc_horizon`
are handled. Previously tenant_size 0 was returned, but now they will
have approximately `initdb_lsn` worth of tenant_size.

The PR also adds several new tenant size tests that describe various corner
cases of branching structure and `gc_horizon` setting.
They are currently disabled to not consume time during CI.

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
2023-01-23 17:46:13 +02:00
Joonas Koivunen
802f174072 fix: dont stop pageserver if we fail to calculate synthetic size 2023-01-23 17:46:13 +02:00
Alexey Kondratov
47f9890bae [compute_ctl] Make role deletion spec processing idempotent (#3380)
Previously, we were trying to re-assign owned objects of the already
deleted role. This were causing a crash loop in the case when compute
was restarted with a spec that includes delta operation for role
deletion. To avoid such cases, check that role is still present before
calling `reassign_owned_objects`.

Resolves neondatabase/cloud#3553
2023-01-23 17:46:13 +02:00
Christian Schwarz
262265daad Revert "Use actual temporary dir for pageserver unit tests"
This reverts commit 826e89b9ce.

The problem with that commit was that it deletes the TempDir while
there are still EphemeralFile instances open.

At first I thought this could be fixed by simply adding

  Handle::current().block_on(task_mgr::shutdown(None, Some(tenant_id), None))

to TenantHarness::drop, but it turned out to be insufficient.

So, reverting the commit until we find a proper solution.

refs https://github.com/neondatabase/neon/issues/3385
2023-01-23 17:46:13 +02:00
bojanserafimov
300da5b872 Improve layer map docstrings (#3382) 2023-01-23 17:46:13 +02:00
Heikki Linnakangas
7b22b5c433 Switch to 'tracing' for logging, restructure code to make use of spans.
Refactors Compute::prepare_and_run. It's split into subroutines
differently, to make it easier to attach tracing spans to the
different stages. The high-level logic for waiting for Postgres to
exit is moved to the caller.

Replace 'env_logger' with 'tracing', and add `#instrument` directives
to different stages fo the startup process. This is a fairly
mechanical change, except for the changes in 'spec.rs'. 'spec.rs'
contained some complicated formatting, where parts of log messages
were printed directly to stdout with `print`s. That was a bit messed
up because the log normally goes to stderr, but those lines were
printed to stdout. In our docker images, stderr and stdout both go to
the same place so you wouldn't notice, but I don't think it was
intentional.

This changes the log format to the default
'tracing_subscriber::format' format. It's different from the Postgres
log format, however, and because both compute_tools and Postgres print
to the same log, it's now a mix of two different formats.  I'm not
sure how the Grafana log parsing pipeline can handle that. If it's a
problem, we can build custom formatter to change the compute_tools log
format to be the same as Postgres's, like it was before this commit,
or we can change the Postgres log format to match tracing_formatter's,
or we can start printing compute_tool's log output to a different
destination than Postgres
2023-01-23 17:46:12 +02:00
Kirill Bulatov
ffca97bc1e Enable logs in unit tests 2023-01-23 17:46:12 +02:00
Kirill Bulatov
cb356f3259 Use actual temporary dir for pageserver unit tests 2023-01-23 17:46:12 +02:00
Vadim Kharitonov
c85374295f Change SENTRY_ENVIRONMENT from "development" to "staging" 2023-01-23 17:46:12 +02:00
Anastasia Lubennikova
4992160677 Fix metric_collection_endpoint for prod.
It was incorrectly set to staging url
2023-01-23 17:46:12 +02:00
Heikki Linnakangas
bd535b3371 If an error happens while checking for core dumps, don't panic.
If we panic, we skip the 30s wait in 'main', and don't give the
console a chance to observe the error. Which is not nice.

Spotted by @ololobus at
https://github.com/neondatabase/neon/pull/3352#discussion_r1072806981
2023-01-23 17:46:12 +02:00
Kirill Bulatov
d90c5a03af Add more io::Error context when fail to operate on a path (#3254)
I have a test failure that shows 

```
Caused by:
    0: Failed to reconstruct a page image:
    1: Directory not empty (os error 39)
```

but does not really show where exactly that happens.

https://neon-github-public-dev.s3.amazonaws.com/reports/pr-3227/release/3823785365/index.html#categories/c0057473fc9ec8fb70876fd29a171ce8/7088dab272f2c7b7/?attachment=60fe6ed2add4d82d

The PR aims to add more context in debugging that issue.
2023-01-23 17:46:12 +02:00
Anastasia Lubennikova
2d02cc9079 Merge pull request #3365 from neondatabase/main
Release 2023-01-17
2023-01-17 16:41:34 +02:00
Christian Schwarz
49ad94b99f Merge pull request #3301 from neondatabase/release-2023-01-10
Release 2023-01-10
2023-01-10 16:42:26 +01:00
Christian Schwarz
948a217398 Merge commit '95bf19b85a06b27a7fc3118dee03d48648efab15' into release-2023-01-10
Conflicts:
        .github/helm-values/neon-stress.proxy-scram.yaml
        .github/helm-values/neon-stress.proxy.yaml
        .github/helm-values/staging.proxy-scram.yaml
        .github/helm-values/staging.proxy.yaml
        All of the above were deleted in `main` after we hotfixed them
        in `release. Deleting them here
        storage_broker/src/bin/storage_broker.rs
        Hotfix toned down logging, but `main` has sinced implemented
        a proper fix. Taken `main`'s side, see
        https://neondb.slack.com/archives/C033RQ5SPDH/p1673354385387479?thread_ts=1673354306.474729&cid=C033RQ5SPDH

closes https://github.com/neondatabase/neon/issues/3287
2023-01-10 15:40:14 +01:00
Dmitry Rodionov
125381eae7 Merge pull request #3236 from neondatabase/dkr/retrofit-sk4-sk4-change
Move zenith-1-sk-3 to zenith-1-sk-4 (#3164)
2022-12-30 14:13:50 +03:00
Arthur Petukhovsky
cd01bbc715 Move zenith-1-sk-3 to zenith-1-sk-4 (#3164) 2022-12-30 12:32:52 +02:00
Dmitry Rodionov
d8b5e3b88d Merge pull request #3229 from neondatabase/dkr/add-pageserver-for-release
add pageserver to new region see https://github.com/neondatabase/aws/pull/116

decrease log volume for pageserver
2022-12-30 12:34:04 +03:00
Dmitry Rodionov
06d25f2186 switch to debug from info to produce less noise 2022-12-29 17:48:47 +02:00
Dmitry Rodionov
f759b561f3 add pageserver to new region see https://github.com/neondatabase/aws/pull/116 2022-12-29 17:17:35 +02:00
Sergey Melnikov
ece0555600 Push proxy metrics to Victoria Metrics (#3106) 2022-12-16 14:44:49 +02:00
Joonas Koivunen
73ea0a0b01 fix(remote_storage): use cached credentials (#3128)
IMDSv2 has limits, and if we query it on every s3 interaction we are
going to go over those limits. Changes the s3_bucket client
configuration to use:
- ChainCredentialsProvider to handle env variables or imds usage
- LazyCachingCredentialsProvider to actually cache any credentials

Related: https://github.com/awslabs/aws-sdk-rust/issues/629
Possibly related: https://github.com/neondatabase/neon/issues/3118
2022-12-16 14:44:49 +02:00
Arseny Sher
d8f6d6fd6f Merge pull request #3126 from neondatabase/broker-lb-release
Deploy broker with L4 LB in new env.
2022-12-16 01:25:28 +03:00
Arseny Sher
d24de169a7 Deploy broker with L4 LB in new env.
Seems to be fixing issue with missing keepalives.
2022-12-16 01:45:32 +04:00
Arseny Sher
0816168296 Hotfix: terminate subscription if channel is full.
Might help as a hotfix, but need to understand root better.
2022-12-15 12:23:56 +03:00
Dmitry Rodionov
277b44d57a Merge pull request #3102 from neondatabase/main
Hotfix. See commits for details
2022-12-14 19:38:43 +03:00
MMeent
68c2c3880e Merge pull request #3038 from neondatabase/main
Release 22-12-14
2022-12-14 14:35:47 +01:00
Arthur Petukhovsky
49da498f65 Merge pull request #2833 from neondatabase/main
Release 2022-11-16
2022-11-17 08:44:10 +01:00
Stas Kelvich
2c76ba3dd7 Merge pull request #2718 from neondatabase/main-rc-22-10-28
Release 22-10-28
2022-10-28 20:33:56 +03:00
Arseny Sher
dbe3dc69ad Merge branch 'main' into main-rc-22-10-28
Release 22-10-28.
2022-10-28 19:10:11 +04:00
Arseny Sher
8e5bb3ed49 Enable etcd compaction in neon_local. 2022-10-27 12:53:20 +03:00
Stas Kelvich
ab0be7b8da Avoid debian-testing packages in compute Dockerfiles
plv8 can only be built with a fairly new gold linker version. We used to install
it via binutils packages from testing, but it also updates libc and that causes
troubles in the resulting image as different extensions were built against
different libc versions. We could either use libc from debian-testing everywhere
or restrain from using testing packages and install necessary programs manually.
This patch uses the latter approach: gold for plv8 and cmake for h3 are
installed manually.

In a passing declare h3_postgis as a safe extension (previous omission).
2022-10-27 12:53:20 +03:00
bojanserafimov
b4c55f5d24 Move pagestream api to libs/pageserver_api (#2698) 2022-10-27 12:53:20 +03:00
mikecaat
ede70d833c Add a docker-compose example file (#1943) (#2666)
Co-authored-by: Masahiro Ikeda <masahiro.ikeda.us@hco.ntt.co.jp>
2022-10-27 12:53:20 +03:00
Sergey Melnikov
70c3d18bb0 Do not release to new staging proxies on release (#2685) 2022-10-27 12:53:20 +03:00
bojanserafimov
7a491f52c4 Add draw_timeline binary (#2688) 2022-10-27 12:53:20 +03:00
Alexander Bayandin
323c4ecb4f Add data format backward compatibility tests (#2626) 2022-10-27 12:53:20 +03:00
Anastasia Lubennikova
3d2466607e Merge pull request #2692 from neondatabase/main-rc
Release 2022-10-25
2022-10-25 18:18:58 +03:00
Anastasia Lubennikova
ed478b39f4 Merge branch 'release' into main-rc 2022-10-25 17:06:33 +03:00
Stas Kelvich
91585a558d Merge pull request #2678 from neondatabase/stas/hotfix_schema
Hotfix to disable grant create on public schema
2022-10-22 02:54:31 +03:00
Stas Kelvich
93467eae1f Hotfix to disable grant create on public schema
`GRANT CREATE ON SCHEMA public` fails if there is no schema `public`.
Disable it in release for now and make a better fix later (it is
needed for v15 support).
2022-10-22 02:26:28 +03:00
Stas Kelvich
f3aac81d19 Merge pull request #2668 from neondatabase/main
Release 2022-10-21
2022-10-21 15:21:42 +03:00
Stas Kelvich
979ad60c19 Merge pull request #2581 from neondatabase/main
Release 2022-10-07
2022-10-07 16:50:55 +03:00
Stas Kelvich
9316cb1b1f Merge pull request #2573 from neondatabase/main
Release 2022-10-06
2022-10-07 11:07:06 +03:00
Anastasia Lubennikova
e7939a527a Merge pull request #2377 from neondatabase/main
Release 2022-09-01
2022-09-01 20:20:44 +03:00
Arthur Petukhovsky
36d26665e1 Merge pull request #2299 from neondatabase/main
* Check for entire range during sasl validation (#2281)

* Gen2 GH runner (#2128)

* Re-add rustup override

* Try s3 bucket

* Set git version

* Use v4 cache key to prevent problems

* Switch to v5 for key

* Add second rustup fix

* Rebase

* Add kaniko steps

* Fix typo and set compress level

* Disable global run default

* Specify shell for step

* Change approach with kaniko

* Try less verbose shell spec

* Add submodule pull

* Add promote step

* Adjust dependency chain

* Try default swap again

* Use env

* Don't override aws key

* Make kaniko build conditional

* Specify runs on

* Try without dependency link

* Try soft fail

* Use image with git

* Try passing to next step

* Fix duplicate

* Try other approach

* Try other approach

* Fix typo

* Try other syntax

* Set env

* Adjust setup

* Try step 1

* Add link

* Try global env

* Fix mistake

* Debug

* Try other syntax

* Try other approach

* Change order

* Move output one step down

* Put output up one level

* Try other syntax

* Skip build

* Try output

* Re-enable build

* Try other syntax

* Skip middle step

* Update check

* Try first step of dockerhub push

* Update needs dependency

* Try explicit dir

* Add missing package

* Try other approach

* Try other approach

* Specify region

* Use with

* Try other approach

* Add debug

* Try other approach

* Set region

* Follow AWS example

* Try github approach

* Skip Qemu

* Try stdin

* Missing steps

* Add missing close

* Add echo debug

* Try v2 endpoint

* Use v1 endpoint

* Try without quotes

* Revert

* Try crane

* Add debug

* Split steps

* Fix duplicate

* Add shell step

* Conform to options

* Add verbose flag

* Try single step

* Try workaround

* First request fails hunch

* Try bullseye image

* Try other approach

* Adjust verbose level

* Try previous step

* Add more debug

* Remove debug step

* Remove rogue indent

* Try with larger image

* Add build tag step

* Update workflow for testing

* Add tag step for test

* Remove unused

* Update dependency chain

* Add ownership fix

* Use matrix for promote

* Force update

* Force build

* Remove unused

* Add new image

* Add missing argument

* Update dockerfile copy

* Update Dockerfile

* Update clone

* Update dockerfile

* Go to correct folder

* Use correct format

* Update dockerfile

* Remove cd

* Debug find where we are

* Add debug on first step

* Changedir to postgres

* Set workdir

* Use v1 approach

* Use other dependency

* Try other approach

* Try other approach

* Update dockerfile

* Update approach

* Update dockerfile

* Update approach

* Update dockerfile

* Update dockerfile

* Add workspace hack

* Update Dockerfile

* Update Dockerfile

* Update Dockerfile

* Change last step

* Cleanup pull in prep for review

* Force build images

* Add condition for latest tagging

* Use pinned version

* Try without name value

* Remove more names

* Shorten names

* Add kaniko comments

* Pin kaniko

* Pin crane and ecr helper

* Up one level

* Switch to pinned tag for rust image

* Force update for test

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@b04468bf-cdf4-41eb-9c94-aff4ca55e4bf.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@Rorys-Mac-Studio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@4795e9ee-4f32-401f-85f3-f316263b62b8.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@2f8bc4e5-4ec2-4ea2-adb1-65d863c4a558.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@27565b2b-72d5-4742-9898-a26c9033e6f9.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@ecc96c26-c6c4-4664-be6e-34f7c3f89a3c.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@7caff3a5-bf03-4202-bd0e-f1a93c86bdae.fritz.box>

* Add missing step output, revert one deploy step (#2285)

* Add missing step output, revert one deploy step

* Conform to syntax

* Update approach

* Add missing value

* Add missing needs

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Error for fatal not git repo (#2286)

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Use main, not branch for ref check (#2288)

* Use main, not branch for ref check

* Add more debug

* Count main, not head

* Try new approach

* Conform to syntax

* Update approach

* Get full history

* Skip checkout

* Cleanup debug

* Remove more debug

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Fix docker zombie process issue (#2289)

* Fix docker zombie process issue

* Init everywhere

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Fix 1.63 clippy lints (#2282)

* split out timeline metrics, track layer map loading and size calculation

* reset rust cache for clippy run to avoid an ICE

additionally remove trailing whitespaces

* Rename pg_control_ffi.h to bindgen_deps.h, for clarity.

The pg_control_ffi.h name implies that it only includes stuff related to
pg_control.h. That's mostly true currently, but really the point of the
file is to include everything that we need to generate Rust definitions
from.

* Make local mypy behave like CI mypy (#2291)

* Fix flaky pageserver restarts in tests (#2261)

* Remove extra type aliases (#2280)

* Update cachepot endpoint (#2290)

* Update cachepot endpoint

* Update dockerfile & remove env

* Update image building process

* Cannot use metadata endpoint for this

* Update workflow

* Conform to kaniko syntax

* Update syntax

* Update approach

* Update dockerfiles

* Force update

* Update dockerfiles

* Update dockerfile

* Cleanup dockerfiles

* Update s3 test location

* Revert s3 experiment

* Add more debug

* Specify aws region

* Remove debug, add prefix

* Remove one more debug

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* workflows/benchmarking: increase timeout (#2294)

* Rework `init` in pageserver CLI  (#2272)

* Do not create initial tenant and timeline (adjust Python tests for that)
* Rework config handling during init, add --update-config to manage local config updates

* Fix: Always build images (#2296)

* Always build images

* Remove unused

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Move auto-generated 'bindings' to a separate inner module.

Re-export only things that are used by other modules.

In the future, I'm imagining that we run bindgen twice, for Postgres
v14 and v15. The two sets of bindings would go into separate
'bindings_v14' and 'bindings_v15' modules.

Rearrange postgres_ffi modules.

Move function, to avoid Postgres version dependency in timelines.rs
Move function to generate a logical-message WAL record to postgres_ffi.

* fix cargo test

* Fix walreceiver and safekeeper bugs (#2295)

- There was an issue with zero commit_lsn `reason: LaggingWal { current_commit_lsn: 0/0, new_commit_lsn: 1/6FD90D38, threshold: 10485760 } }`. The problem was in `send_wal.rs`, where we initialized `end_pos = Lsn(0)` and in some cases sent it to the pageserver.
- IDENTIFY_SYSTEM previously returned `flush_lsn` as a physical end of WAL. Now it returns `flush_lsn` (as it was) to walproposer and `commit_lsn` to everyone else including pageserver.
- There was an issue with backoff where connection was cancelled right after initialization: `connected!` -> `safekeeper_handle_db: Connection cancelled` -> `Backoff: waiting 3 seconds`. The problem was in sleeping before establishing the connection. This is fixed by reworking retry logic.
- There was an issue with getting `NoKeepAlives` reason in a loop. The issue is probably the same as the previous.
- There was an issue with filtering safekeepers based on retry attempts, which could filter some safekeepers indefinetely. This is fixed by using retry cooldown duration instead of retry attempts.
- Some `send_wal.rs` connections failed with errors without context. This is fixed by adding a timeline to safekeepers errors.

New retry logic works like this:
- Every candidate has a `next_retry_at` timestamp and is not considered for connection until that moment
- When walreceiver connection is closed, we update `next_retry_at` using exponential backoff, increasing the cooldown on every disconnect.
- When `last_record_lsn` was advanced using the WAL from the safekeeper, we reset the retry cooldown and exponential backoff, allowing walreceiver to reconnect to the same safekeeper instantly.

* on safekeeper registration pass availability zone param (#2292)

Co-authored-by: Kirill Bulatov <kirill@neon.tech>
Co-authored-by: Rory de Zoete <33318916+zoete@users.noreply.github.com>
Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@b04468bf-cdf4-41eb-9c94-aff4ca55e4bf.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@Rorys-Mac-Studio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@4795e9ee-4f32-401f-85f3-f316263b62b8.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@2f8bc4e5-4ec2-4ea2-adb1-65d863c4a558.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@27565b2b-72d5-4742-9898-a26c9033e6f9.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@ecc96c26-c6c4-4664-be6e-34f7c3f89a3c.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@7caff3a5-bf03-4202-bd0e-f1a93c86bdae.fritz.box>
Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: bojanserafimov <bojan.serafimov7@gmail.com>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
Co-authored-by: Anton Galitsyn <agalitsyn@users.noreply.github.com>
2022-08-18 15:32:33 +03:00
Arthur Petukhovsky
873347f977 Merge pull request #2275 from neondatabase/main
* github/workflows: Fix git dubious ownership (#2223)

* Move relation size cache from WalIngest to DatadirTimeline (#2094)

* Move relation sie cache to layered timeline

* Fix obtaining current LSN for relation size cache

* Resolve merge conflicts

* Resolve merge conflicts

* Reestore 'lsn' field in DatadirModification

* adjust DatadirModification lsn in ingest_record

* Fix formatting

* Pass lsn to get_relsize

* Fix merge conflict

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* refactor: replace lazy-static with once-cell (#2195)

- Replacing all the occurrences of lazy-static with `once-cell::sync::Lazy`
- fixes #1147

Signed-off-by: Ankur Srivastava <best.ankur@gmail.com>

* Add more buckets to pageserver latency metrics (#2225)

* ignore record property warning to fix benchmarks

* increase statement timeout

* use event so it fires only if workload thread successfully finished

* remove debug log

* increase timeout to pass test with real s3

* avoid duplicate parameter, increase timeout

* Major migration script (#2073)

This script can be used to migrate a tenant across breaking storage versions, or (in the future) upgrading postgres versions. See the comment at the top for an overview.

Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>

* Fix etcd typos

* Fix links to safekeeper protocol docs. (#2188)

safekeeper/README_PROTO.md was moved to docs/safekeeper-protocol.md in
commit 0b14fdb078, as part of reorganizing the docs into 'mdbook' format.

Fixes issue #1475. Thanks to @banks for spotting the outdated references.

In addition to fixing the above issue, this patch also fixes other broken links as a result of 0b14fdb078. See https://github.com/neondatabase/neon/pull/2188#pullrequestreview-1055918480.

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: Thang Pham <thang@neon.tech>

* Update CONTRIBUTING.md

* Update CONTRIBUTING.md

* support node id and remote storage params in docker_entrypoint.sh

* Safe truncate (#2218)

* Move relation sie cache to layered timeline

* Fix obtaining current LSN for relation size cache

* Resolve merge conflicts

* Resolve merge conflicts

* Reestore 'lsn' field in DatadirModification

* adjust DatadirModification lsn in ingest_record

* Fix formatting

* Pass lsn to get_relsize

* Fix merge conflict

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Check if relation exists before trying to truncat it

refer #1932

* Add test reporducing FSM truncate problem

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Fix exponential backoff values

* Update back `vendor/postgres` back; it was changed accidentally. (#2251)

Commit 4227cfc96e accidentally reverted vendor/postgres to an older
version. Update it back.

* Add pageserver checkpoint_timeout option.

To flush inmemory layer eventually when no new data arrives, which helps
safekeepers to suspend activity (stop pushing to the broker). Default 10m should
be ok.

* Share exponential backoff code and fix logic for delete task failure (#2252)

* Fix bug when import large (>1GB) relations (#2172)

Resolves #2097 

- use timeline modification's `lsn` and timeline's `last_record_lsn` to determine the corresponding LSN to query data in `DatadirModification::get`
- update `test_import_from_pageserver`. Split the test into 2 variants: `small` and `multisegment`. 
  + `small` is the old test
  + `multisegment` is to simulate #2097 by using a larger number of inserted rows to create multiple segment files of a relation. `multisegment` is configured to only run with a `release` build

* Fix timeline physical size flaky tests (#2244)

Resolves #2212.

- use `wait_for_last_flush_lsn` in `test_timeline_physical_size_*` tests

## Context
Need to wait for the pageserver to catch up with the compute's last flush LSN because during the timeline physical size API call, it's possible that there are running `LayerFlushThread` threads. These threads flush new layers into disk and hence update the physical size. This results in a mismatch between the physical size reported by the API and the actual physical size on disk.

### Note
The `LayerFlushThread` threads are processed **concurrently**, so it's possible that the above error still persists even with this patch. However, making the tests wait to finish processing all the WALs (not flushing) before calculating the physical size should help reduce the "flakiness" significantly

* postgres_ffi/waldecoder: validate more header fields

* postgres_ffi/waldecoder: remove unused startlsn

* postgres_ffi/waldecoder: introduce explicit `enum State`

Previously it was emulated with a combination of nullable fields.
This change should make the logic more readable.

* disable `test_import_from_pageserver_multisegment` (#2258)

This test failed consistently on `main` now. It's better to temporarily disable it to avoid blocking others' PRs while investigating the root cause for the test failure.

See: #2255, #2256

* get_binaries uses DOCKER_TAG taken from docker image build step (#2260)

* [proxy] Rework wire format of the password hack and some errors (#2236)

The new format has a few benefits: it's shorter, simpler and
human-readable as well. We don't use base64 anymore, since
url encoding got us covered.

We also show a better error in case we couldn't parse the
payload; the users should know it's all about passing the
correct project name.

* test_runner/pg_clients: collect docker logs (#2259)

* get_binaries script fix (#2263)

* get_binaries uses DOCKER_TAG taken from docker image build step

* remove docker tag discovery at all and fix get_binaries for version variable

* Better storage sync logs (#2268)

* Find end of WAL on safekeepers using WalStreamDecoder.

We could make it inside wal_storage.rs, but taking into account that
 - wal_storage.rs reading is async
 - we don't need s3 here
 - error handling is different; error during decoding is normal
I decided to put it separately.

Test
cargo test test_find_end_of_wal_last_crossing_segment
prepared earlier by @yeputons passes now.

Fixes https://github.com/neondatabase/neon/issues/544
      https://github.com/neondatabase/cloud/issues/2004
Supersedes https://github.com/neondatabase/neon/pull/2066

* Improve walreceiver logic (#2253)

This patch makes walreceiver logic more complicated, but it should work better in most cases. Added `test_wal_lagging` to test scenarios where alive safekeepers can lag behind other alive safekeepers.

- There was a bug which looks like `etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn())` filtered all safekeepers in some strange cases. I removed this filter, it should probably help with #2237
- Now walreceiver_connection reports status, including commit_lsn. This allows keeping safekeeper connection even when etcd is down.
- Safekeeper connection now fails if pageserver doesn't receive safekeeper messages for some time. Usually safekeeper sends messages at least once per second.
- `LaggingWal` check now uses `commit_lsn` directly from safekeeper. This fixes the issue with often reconnects, when compute generates WAL really fast.
- `NoWalTimeout` is rewritten to trigger only when we know about the new WAL and the connected safekeeper doesn't stream any WAL. This allows setting a small `lagging_wal_timeout` because it will trigger only when we observe that the connected safekeeper has stuck.

* increase timeout in wait_for_upload to avoid spurious failures when testing with real s3

* Bump vendor/postgres to include XLP_FIRST_IS_CONTRECORD fix. (#2274)

* Set up a workflow to run pgbench against captest (#2077)

Signed-off-by: Ankur Srivastava <best.ankur@gmail.com>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Konstantin Knizhnik <knizhnik@garret.ru>
Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>
Co-authored-by: Ankur Srivastava <ansrivas@users.noreply.github.com>
Co-authored-by: bojanserafimov <bojan.serafimov7@gmail.com>
Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
Co-authored-by: Kirill Bulatov <kirill@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: Thang Pham <thang@neon.tech>
Co-authored-by: Stas Kelvich <stas.kelvich@gmail.com>
Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
Co-authored-by: Egor Suvorov <egor@neon.tech>
Co-authored-by: Andrey Taranik <andrey@cicd.team>
Co-authored-by: Dmitry Ivanov <ivadmi5@gmail.com>
2022-08-15 21:30:45 +03:00
Arthur Petukhovsky
e814ac16f9 Merge pull request #2219 from neondatabase/main
Release 2022-08-04
2022-08-04 20:06:34 +03:00
Heikki Linnakangas
ad3055d386 Merge pull request #2203 from neondatabase/release-uuid-ossp
Deploy new storage and compute version to production

Release 2022-08-02
2022-08-02 15:08:14 +03:00
Heikki Linnakangas
94e03eb452 Merge remote-tracking branch 'origin/main' into 'release'
Release 2022-08-01
2022-08-02 12:43:49 +03:00
Sergey Melnikov
380f26ef79 Merge pull request #2170 from neondatabase/main (Release 2022-07-28)
Release 2022-07-28
2022-07-28 14:16:52 +03:00
Arthur Petukhovsky
3c5b7f59d7 Merge pull request #2119 from neondatabase/main
Release 2022-07-19
2022-07-19 11:58:48 +03:00
Arthur Petukhovsky
fee89f80b5 Merge pull request #2115 from neondatabase/main-2022-07-18
Release 2022-07-18
2022-07-18 19:21:11 +03:00
Arthur Petukhovsky
41cce8eaf1 Merge remote-tracking branch 'origin/release' into main-2022-07-18 2022-07-18 18:21:20 +03:00
Alexey Kondratov
f88fe0218d Merge pull request #1842 from neondatabase/release-deploy-hotfix
[HOTFIX] Release deploy fix

This PR uses this branch neondatabase/postgres#171 and several required commits from the main to use only locally built compute-tools. This should allow us to rollout safekeepers sync issue fix on prod
2022-06-01 11:04:30 +03:00
Alexey Kondratov
cc856eca85 Install missing openssl packages in the Github Actions workflow 2022-05-31 21:31:31 +02:00
Alexey Kondratov
cf350c6002 Use :local compute-tools tag to build compute-node image 2022-05-31 21:31:16 +02:00
Arseny Sher
0ce6b6a0a3 Merge pull request #1836 from neondatabase/release-hotfix-basebackup-lsn-page-boundary
Bump vendor/postgres to hotfix basebackup LSN comparison.
2022-05-31 16:54:03 +04:00
Arseny Sher
73f247d537 Bump vendor/postgres to hotfix basebackup LSN comparison. 2022-05-31 16:00:50 +04:00
Andrey Taranik
960be82183 Merge pull request #1792 from neondatabase/main
Release 2202-05-25 (second)
2022-05-25 16:37:57 +03:00
Andrey Taranik
806e5a6c19 Merge pull request #1787 from neondatabase/main
Release 2022-05-25
2022-05-25 13:34:11 +03:00
Alexey Kondratov
8d5df07cce Merge pull request #1385 from zenithdb/main
Release main 2022-03-22
2022-03-22 05:04:34 -05:00
Andrey Taranik
df7a9d1407 release fix 2022-03-16 (#1375) 2022-03-17 00:43:28 +03:00
59 changed files with 907 additions and 1576 deletions

6
Cargo.lock generated
View File

@@ -1161,7 +1161,6 @@ dependencies = [
"flate2",
"futures",
"hyper",
"nix 0.26.2",
"notify",
"num_cpus",
"opentelemetry",
@@ -1172,7 +1171,6 @@ dependencies = [
"rust-ini",
"serde",
"serde_json",
"signal-hook",
"tar",
"tokio",
"tokio-postgres",
@@ -4405,14 +4403,12 @@ dependencies = [
"async-stream",
"aws-config",
"aws-sdk-s3",
"aws-smithy-async",
"bincode",
"bytes",
"chrono",
"clap",
"crc32c",
"either",
"futures",
"futures-util",
"hex",
"histogram",
@@ -4451,7 +4447,6 @@ dependencies = [
"clap",
"const_format",
"crc32c",
"fail",
"fs2",
"futures",
"git-version",
@@ -5883,7 +5878,6 @@ dependencies = [
"chrono",
"const_format",
"criterion",
"fail",
"futures",
"heapless",
"hex",

View File

@@ -13,7 +13,6 @@ clap.workspace = true
flate2.workspace = true
futures.workspace = true
hyper = { workspace = true, features = ["full"] }
nix.workspace = true
notify.workspace = true
num_cpus.workspace = true
opentelemetry.workspace = true
@@ -21,7 +20,6 @@ postgres.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
tar.workspace = true
reqwest = { workspace = true, features = ["json"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }

View File

@@ -40,22 +40,18 @@ use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::process::exit;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use nix::sys::signal::{kill, Signal};
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info};
use url::Url;
use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec, PG_PID, SYNC_SAFEKEEPERS_PID};
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::http::api::launch_http_server;
@@ -71,13 +67,6 @@ const BUILD_TAG_DEFAULT: &str = "latest";
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
for sig in signals.forever() {
handle_exit_signal(sig);
}
});
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
@@ -357,7 +346,6 @@ fn main() -> Result<()> {
let ecode = pg
.wait()
.expect("failed to start waiting on Postgres process");
PG_PID.store(0, Ordering::SeqCst);
info!("Postgres exited with code {}, shutting down", ecode);
exit_code = ecode.code()
}
@@ -531,24 +519,6 @@ fn cli() -> clap::Command {
)
}
/// When compute_ctl is killed, send also termination signal to sync-safekeepers
/// to prevent leakage. TODO: it is better to convert compute_ctl to async and
/// wait for termination which would be easy then.
fn handle_exit_signal(sig: i32) {
info!("received {sig} termination signal");
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
if ss_pid != 0 {
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
kill(ss_pid, Signal::SIGTERM).ok();
}
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
kill(pg_pid, Signal::SIGTERM).ok();
}
exit(1);
}
#[test]
fn verify_cli() {
cli().debug_assert()

View File

@@ -6,8 +6,6 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::{Condvar, Mutex, RwLock};
use std::thread;
use std::time::Instant;
@@ -36,9 +34,6 @@ use crate::spec::*;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::{config, extension_server};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
// Url type maintains proper escaping
@@ -506,7 +501,6 @@ impl ComputeNode {
.stdout(Stdio::piped())
.spawn()
.expect("postgres --sync-safekeepers failed to start");
SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst);
// `postgres --sync-safekeepers` will print all log output to stderr and
// final LSN to stdout. So we pipe only stdout, while stderr will be automatically
@@ -514,7 +508,6 @@ impl ComputeNode {
let sync_output = sync_handle
.wait_with_output()
.expect("postgres --sync-safekeepers failed");
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
if !sync_output.status.success() {
anyhow::bail!(
@@ -669,7 +662,6 @@ impl ComputeNode {
})
.spawn()
.expect("cannot start postgres process");
PG_PID.store(pg.id(), Ordering::SeqCst);
wait_for_postgres(&mut pg, pgdata_path)?;

View File

@@ -46,8 +46,6 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::spec::RemoteExtSpec;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId, TimelineId};
@@ -441,14 +439,11 @@ impl Endpoint {
Ok(())
}
fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
fn wait_for_compute_ctl_to_exit(&self) -> Result<()> {
// TODO use background_process::stop_process instead
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
let pid = nix::unistd::Pid::from_raw(pid as i32);
if send_sigterm {
kill(pid, Signal::SIGTERM).ok();
}
crate::background_process::wait_until_stopped("compute_ctl", pid)?;
Ok(())
}
@@ -738,15 +733,10 @@ impl Endpoint {
&None,
)?;
// Also wait for the compute_ctl process to die. It might have some
// cleanup work to do after postgres stops, like syncing safekeepers,
// etc.
// Also wait for the compute_ctl process to die. It might have some cleanup
// work to do after postgres stops, like syncing safekeepers, etc.
//
// If destroying, send it SIGTERM before waiting. Sometimes we do *not*
// want this cleanup: tests intentionally do stop when majority of
// safekeepers is down, so sync-safekeepers would hang otherwise. This
// could be a separate flag though.
self.wait_for_compute_ctl_to_exit(destroy)?;
self.wait_for_compute_ctl_to_exit()?;
if destroy {
println!(
"Destroying postgres data directory '{}'",

View File

@@ -124,9 +124,6 @@ impl KeySpaceAccum {
if range.start == accum.end {
accum.end = range.end;
} else {
// TODO: to efficiently support small sharding stripe sizes, we should avoid starting
// a new range here if the skipped region was all keys that don't belong on this shard.
// (https://github.com/neondatabase/neon/issues/6247)
assert!(range.start > accum.end);
self.ranges.push(accum.clone());
*accum = range;

View File

@@ -557,6 +557,19 @@ pub enum DownloadRemoteLayersTaskState {
ShutDown,
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,

View File

@@ -422,21 +422,6 @@ impl ShardIdentity {
}
}
/// Return true if the key should be discarded if found in this shard's
/// data store, e.g. during compaction after a split
pub fn is_key_disposable(&self, key: &Key) -> bool {
if key_is_shard0(key) {
// Q: Why can't we dispose of shard0 content if we're not shard 0?
// A: because the WAL ingestion logic currently ingests some shard 0
// content on all shards, even though it's only read on shard 0. If we
// dropped it, then subsequent WAL ingest to these keys would encounter
// an error.
false
} else {
!self.is_key_local(key)
}
}
pub fn shard_slug(&self) -> String {
if self.count > ShardCount(0) {
format!("-{:02x}{:02x}", self.number.0, self.count.0)

View File

@@ -35,12 +35,6 @@ pub enum QueryError {
/// We were instructed to shutdown while processing the query
#[error("Shutting down")]
Shutdown,
/// Query handler indicated that client should reconnect
#[error("Server requested reconnect")]
Reconnect,
/// Query named an entity that was not found
#[error("Not found: {0}")]
NotFound(std::borrow::Cow<'static, str>),
/// Authentication failure
#[error("Unauthorized: {0}")]
Unauthorized(std::borrow::Cow<'static, str>),
@@ -60,9 +54,9 @@ impl From<io::Error> for QueryError {
impl QueryError {
pub fn pg_error_code(&self) -> &'static [u8; 5] {
match self {
Self::Disconnected(_) | Self::SimulatedConnectionError | Self::Reconnect => b"08006", // connection failure
Self::Disconnected(_) | Self::SimulatedConnectionError => b"08006", // connection failure
Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN,
Self::Unauthorized(_) | Self::NotFound(_) => SQLSTATE_INTERNAL_ERROR,
Self::Unauthorized(_) => SQLSTATE_INTERNAL_ERROR,
Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error
}
}
@@ -431,11 +425,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
info!("Stopped due to shutdown");
Ok(())
}
Err(QueryError::Reconnect) => {
// Dropping out of this loop implicitly disconnects
info!("Stopped due to handler reconnect request");
Ok(())
}
Err(QueryError::Disconnected(e)) => {
info!("Disconnected ({e:#})");
// Disconnection is not an error: we just use it that way internally to drop
@@ -985,9 +974,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> AsyncWrite for CopyDataWriter<'a, I
pub fn short_error(e: &QueryError) -> String {
match e {
QueryError::Disconnected(connection_error) => connection_error.to_string(),
QueryError::Reconnect => "reconnect".to_string(),
QueryError::Shutdown => "shutdown".to_string(),
QueryError::NotFound(_) => "not found".to_string(),
QueryError::Unauthorized(_e) => "JWT authentication error".to_string(),
QueryError::SimulatedConnectionError => "simulated connection error".to_string(),
QueryError::Other(e) => format!("{e:#}"),
@@ -1009,15 +996,9 @@ fn log_query_error(query: &str, e: &QueryError) {
QueryError::SimulatedConnectionError => {
error!("query handler for query '{query}' failed due to a simulated connection error")
}
QueryError::Reconnect => {
info!("query handler for '{query}' requested client to reconnect")
}
QueryError::Shutdown => {
info!("query handler for '{query}' cancelled during tenant shutdown")
}
QueryError::NotFound(reason) => {
info!("query handler for '{query}' entity not found: {reason}")
}
QueryError::Unauthorized(e) => {
warn!("query handler for '{query}' failed with authentication error: {e}");
}

View File

@@ -4,12 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
[dependencies]
arc-swap.workspace = true
sentry.workspace = true
@@ -22,7 +16,6 @@ chrono.workspace = true
heapless.workspace = true
hex = { workspace = true, features = ["serde"] }
hyper = { workspace = true, features = ["full"] }
fail.workspace = true
futures = { workspace = true}
jsonwebtoken.workspace = true
nix.workspace = true

View File

@@ -31,9 +31,6 @@ pub enum ApiError {
#[error("Shutting down")]
ShuttingDown,
#[error("Timeout")]
Timeout(Cow<'static, str>),
#[error(transparent)]
InternalServerError(anyhow::Error),
}
@@ -70,10 +67,6 @@ impl ApiError {
err.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::REQUEST_TIMEOUT,
),
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,

View File

@@ -83,8 +83,6 @@ pub mod timeout;
pub mod sync;
pub mod failpoint_support;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -425,7 +425,7 @@ mod tests {
}
fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
println!("wp_log[{}] {}", level, msg);
println!("walprop_log[{}] {}", level, msg);
}
fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {

View File

@@ -13,7 +13,6 @@ use bytes::{Buf, Bytes};
use pageserver::{
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
};
use pageserver_api::shard::TenantShardId;
use utils::{id::TenantId, lsn::Lsn};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
@@ -27,9 +26,9 @@ fn redo_scenarios(c: &mut Criterion) {
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let tenant_id = TenantId::generate();
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
let manager = PostgresRedoManager::new(conf, tenant_id);
let manager = Arc::new(manager);

View File

@@ -115,8 +115,15 @@ impl PagestreamClient {
pub async fn getpage(
&mut self,
req: PagestreamGetPageRequest,
key: RelTagBlockNo,
lsn: Lsn,
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamGetPageRequest {
latest: false,
rel: key.rel_tag,
blkno: key.block_no,
lsn,
};
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);

View File

@@ -3,7 +3,7 @@ use futures::future::join_all;
use pageserver::pgdatadir_mapping::key_to_rel_block;
use pageserver::repository;
use pageserver_api::key::is_rel_block_key;
use pageserver_api::models::PagestreamGetPageRequest;
use pageserver_client::page_service::RelTagBlockNo;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
@@ -39,9 +39,6 @@ pub(crate) struct Args {
runtime: Option<humantime::Duration>,
#[clap(long)]
per_target_rate_limit: Option<usize>,
/// Probability for sending `latest=true` in the request (uniform distribution).
#[clap(long, default_value = "1")]
req_latest_probability: f64,
#[clap(long)]
limit_to_first_n_targets: Option<usize>,
targets: Option<Vec<TenantTimelineId>>,
@@ -203,26 +200,18 @@ async fn main_impl(
start_work_barrier.wait().await;
loop {
let (timeline, req) = {
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &all_ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
(
r.timeline,
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
},
)
(r, RelTagBlockNo { rel_tag, block_no })
};
let sender = work_senders.get(&timeline).unwrap();
let sender = work_senders.get(&range.timeline).unwrap();
// TODO: what if this blocks?
sender.send(req).await.ok().unwrap();
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
}),
Some(rps_limit) => Box::pin(async move {
@@ -251,21 +240,16 @@ async fn main_impl(
);
loop {
ticker.tick().await;
let req = {
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) = key_to_rel_block(key)
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
(r, RelTagBlockNo { rel_tag, block_no })
};
sender.send(req).await.ok().unwrap();
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
})
};
@@ -319,7 +303,7 @@ async fn client(
args: &'static Args,
timeline: TenantTimelineId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>,
all_work_done_barrier: Arc<Barrier>,
live_stats: Arc<LiveStats>,
) {
@@ -333,10 +317,10 @@ async fn client(
.await
.unwrap();
while let Some(req) = work.recv().await {
while let Some((key, lsn)) = work.recv().await {
let start = Instant::now();
client
.getpage(req)
.getpage(key, lsn)
.await
.with_context(|| format!("getpage for {timeline}"))
.unwrap();

View File

@@ -23,7 +23,6 @@ use tracing::*;
use tokio_tar::{Builder, EntryType, Header};
use crate::context::RequestContext;
use crate::pgdatadir_mapping::Version;
use crate::tenant::Timeline;
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -175,7 +174,7 @@ where
] {
for segno in self
.timeline
.list_slru_segments(kind, Version::Lsn(self.lsn), self.ctx)
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
self.add_slru_segment(kind, segno).await?;
@@ -193,7 +192,7 @@ where
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
@@ -268,7 +267,7 @@ where
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_size(src, self.lsn, false, self.ctx)
.await?;
// If the relation is empty, create an empty file
@@ -289,7 +288,7 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
.await?;
segment_data.extend_from_slice(&img[..]);
}
@@ -311,7 +310,7 @@ where
async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
let nblocks = self
.timeline
.get_slru_segment_size(slru, segno, Version::Lsn(self.lsn), self.ctx)
.get_slru_segment_size(slru, segno, self.lsn, self.ctx)
.await?;
let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
@@ -353,7 +352,7 @@ where
let relmap_img = if has_relmap_file {
let img = self
.timeline
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.get_relmap_file(spcnode, dbnode, self.lsn, self.ctx)
.await?;
ensure!(
@@ -400,7 +399,7 @@ where
if !has_relmap_file
&& self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?
.is_empty()
{

View File

@@ -31,7 +31,6 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::failpoint_support;
use utils::logging::TracingErrorLayerEnablement;
use utils::signals::ShutdownSignals;
use utils::{
@@ -127,7 +126,7 @@ fn main() -> anyhow::Result<()> {
}
// Initialize up failpoints support
let scenario = failpoint_support::init();
let scenario = pageserver::failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);

View File

@@ -76,8 +76,6 @@ pub mod defaults {
pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
///
/// Default built-in configuration file.
///
@@ -90,7 +88,6 @@ pub mod defaults {
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
#page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
#max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
# initial superuser role name to use when creating a new tenant
@@ -111,8 +108,6 @@ pub mod defaults {
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
[tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -238,9 +233,6 @@ pub struct PageServerConf {
/// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize
/// heatmap uploads vs. other remote storage operations.
pub heatmap_upload_concurrency: usize,
/// Maximum number of WAL records to be ingested and committed at the same time
pub ingest_batch_size: u64,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -322,8 +314,6 @@ struct PageServerConfigBuilder {
control_plane_emergency_mode: BuilderValue<bool>,
heatmap_upload_concurrency: BuilderValue<usize>,
ingest_batch_size: BuilderValue<u64>,
}
impl Default for PageServerConfigBuilder {
@@ -396,8 +386,6 @@ impl Default for PageServerConfigBuilder {
control_plane_emergency_mode: Set(false),
heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
}
}
}
@@ -546,10 +534,6 @@ impl PageServerConfigBuilder {
self.heatmap_upload_concurrency = BuilderValue::Set(value)
}
pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) {
self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_warmup = self
.concurrent_tenant_warmup
@@ -648,12 +632,10 @@ impl PageServerConfigBuilder {
control_plane_emergency_mode: self
.control_plane_emergency_mode
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
heatmap_upload_concurrency: self
.heatmap_upload_concurrency
.ok_or(anyhow!("missing heatmap_upload_concurrency"))?,
ingest_batch_size: self
.ingest_batch_size
.ok_or(anyhow!("missing ingest_batch_size"))?,
})
}
}
@@ -896,7 +878,6 @@ impl PageServerConf {
"heatmap_upload_concurrency" => {
builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize)
},
"ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -968,7 +949,6 @@ impl PageServerConf {
control_plane_api_token: None,
control_plane_emergency_mode: false,
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
}
}
}
@@ -1197,8 +1177,7 @@ background_task_maximum_delay = '334 s'
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false,
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY
},
"Correct defaults should be used when no config values are provided"
);
@@ -1259,8 +1238,7 @@ background_task_maximum_delay = '334 s'
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false,
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
ingest_batch_size: 100,
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -1,14 +1,3 @@
//! Failpoint support code shared between pageserver and safekeepers.
use crate::http::{
error::ApiError,
json::{json_request, json_response},
};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
@@ -36,7 +25,7 @@ pub use __failpoint_sleep_millis_async as sleep_millis_async;
// Helper function used by the macro. (A function has nicer scoping so we
// don't need to decorate everything with "::")
#[doc(hidden)]
pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
pub(crate) async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
let millis = duration_str.parse::<u64>().unwrap();
let d = std::time::Duration::from_millis(millis);
@@ -82,7 +71,7 @@ pub fn init() -> fail::FailScenario<'static> {
scenario
}
pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
pub(crate) fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
if actions == "exit" {
fail::cfg_callback(name, exit_failpoint)
} else {
@@ -95,45 +84,3 @@ fn exit_failpoint() {
tracing::info!("Exit requested by failpoint");
std::process::exit(1);
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
/// Configure failpoints through http.
pub async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because storage was compiled without failpoints support"
)));
}
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Failed to configure failpoints: {err_msg}"
)));
}
}
json_response(StatusCode::OK, ())
}

View File

@@ -25,7 +25,6 @@ use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
@@ -67,6 +66,9 @@ use utils::{
lsn::Lsn,
};
// Imports only used for testing APIs
use pageserver_api::models::ConfigureFailpointsRequest;
// For APIs that require an Active tenant, how long should we block waiting for that state?
// This is not functionally necessary (clients will retry), but avoids generating a lot of
// failed API calls while tenants are activating.
@@ -152,7 +154,6 @@ impl From<PageReconstructError> for ApiError {
PageReconstructError::AncestorStopping(_) => {
ApiError::ResourceUnavailable(format!("{pre}").into())
}
PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
}
}
@@ -1292,6 +1293,34 @@ async fn handle_tenant_break(
json_response(StatusCode::OK, ())
}
async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow!(
"Cannot manage failpoints because pageserver was compiled without failpoints support"
)));
}
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = crate::failpoint_support::apply_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow!(
"Failed to configure failpoints: {err_msg}"
)));
}
}
json_response(StatusCode::OK, ())
}
// Run GC immediately on given timeline.
async fn timeline_gc_handler(
mut request: Request<Body>,

View File

@@ -21,7 +21,6 @@ use tracing::*;
use walkdir::WalkDir;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::remote_timeline_client::INITDB_PATH;
use crate::tenant::Timeline;
@@ -313,16 +312,13 @@ async fn import_wal(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification(last_lsn);
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.await?;
WAL_INGEST.records_committed.inc();
modification.commit(ctx).await?;
last_lsn = lsn;
nrecords += 1;
@@ -452,14 +448,13 @@ pub async fn import_wal_from_tar(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification(last_lsn);
let mut modification = tline.begin_modification(end_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.await?;
modification.commit(ctx).await?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);

View File

@@ -25,6 +25,8 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod failpoint_support;
use crate::task_mgr::TaskKind;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;

View File

@@ -25,7 +25,6 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::net::TcpListener;
use std::pin::pin;
@@ -54,7 +53,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::pgdatadir_mapping::{rel_block_to_key, Version};
use crate::pgdatadir_mapping::rel_block_to_key;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -62,9 +61,6 @@ use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use crate::trace::Tracer;
@@ -287,64 +283,6 @@ struct PageServerHandler {
connection_ctx: RequestContext,
}
#[derive(thiserror::Error, Debug)]
enum PageStreamError {
/// We encountered an error that should prompt the client to reconnect:
/// in practice this means we drop the connection without sending a response.
#[error("Reconnect required: {0}")]
Reconnect(Cow<'static, str>),
/// We were instructed to shutdown while processing the query
#[error("Shutting down")]
Shutdown,
/// Something went wrong reading a page: this likely indicates a pageserver bug
#[error("Read error: {0}")]
Read(PageReconstructError),
/// Ran out of time waiting for an LSN
#[error("LSN timeout: {0}")]
LsnTimeout(WaitLsnError),
/// The entity required to serve the request (tenant or timeline) is not found,
/// or is not found in a suitable state to serve a request.
#[error("Not found: {0}")]
NotFound(std::borrow::Cow<'static, str>),
/// Request asked for something that doesn't make sense, like an invalid LSN
#[error("Bad request: {0}")]
BadRequest(std::borrow::Cow<'static, str>),
}
impl From<PageReconstructError> for PageStreamError {
fn from(value: PageReconstructError) -> Self {
match value {
PageReconstructError::Cancelled => Self::Shutdown,
e => Self::Read(e),
}
}
}
impl From<GetActiveTimelineError> for PageStreamError {
fn from(value: GetActiveTimelineError) -> Self {
match value {
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
}
}
}
impl From<WaitLsnError> for PageStreamError {
fn from(value: WaitLsnError) -> Self {
match value {
e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
WaitLsnError::Shutdown => Self::Shutdown,
WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
}
}
}
impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
@@ -490,7 +428,7 @@ impl PageServerHandler {
// Check that the timeline exists
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| QueryError::NotFound(format!("{e}").into()))?;
.map_err(|e| anyhow::anyhow!(e))?;
// Avoid starting new requests if the timeline has already started shutting down,
// and block timeline shutdown until this request is complete, or drops out due
@@ -582,44 +520,32 @@ impl PageServerHandler {
}
};
match response {
Err(PageStreamError::Shutdown) => {
if let Err(e) = &response {
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
// because wait_lsn etc will drop out
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
// is_canceled(): [`Timeline::shutdown`]` has entered
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
span.in_scope(|| info!("dropped response during shutdown: {e:#}"));
return Err(QueryError::Shutdown);
}
Err(PageStreamError::Reconnect(reason)) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => {
// This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
// shutdown error, this may be buried inside a PageReconstructError::Other for example.
//
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
// because wait_lsn etc will drop out
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
// is_canceled(): [`Timeline::shutdown`]` has entered
span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
return Err(QueryError::Shutdown);
}
r => {
let response_msg = r.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
}
}
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
}
Ok(())
}
@@ -766,7 +692,7 @@ impl PageServerHandler {
latest: bool,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
ctx: &RequestContext,
) -> Result<Lsn, PageStreamError> {
) -> anyhow::Result<Lsn> {
if latest {
// Latest page version was requested. If LSN is given, it is a hint
// to the page server that there have been no modifications to the
@@ -797,19 +723,15 @@ impl PageServerHandler {
}
} else {
if lsn == Lsn(0) {
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
anyhow::bail!("invalid LSN(0) in request");
}
timeline.wait_lsn(lsn, ctx).await?;
}
if lsn < **latest_gc_cutoff_lsn {
return Err(PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, **latest_gc_cutoff_lsn
).into()));
}
anyhow::ensure!(
lsn >= **latest_gc_cutoff_lsn,
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, **latest_gc_cutoff_lsn
);
Ok(lsn)
}
@@ -818,14 +740,14 @@ impl PageServerHandler {
timeline: &Timeline,
req: &PagestreamExistsRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
) -> anyhow::Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let exists = timeline
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_exists(req.rel, lsn, req.latest, ctx)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
@@ -838,15 +760,13 @@ impl PageServerHandler {
timeline: &Timeline,
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
) -> anyhow::Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let n_blocks = timeline
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
.await?;
let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest, ctx).await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
n_blocks,
@@ -858,20 +778,14 @@ impl PageServerHandler {
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
) -> anyhow::Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let total_blocks = timeline
.get_db_size(
DEFAULTTABLESPACE_OID,
req.dbnode,
Version::Lsn(lsn),
req.latest,
ctx,
)
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest, ctx)
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;
@@ -880,35 +794,30 @@ impl PageServerHandler {
}))
}
async fn do_handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
}
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
) -> anyhow::Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
/*
// Add a 1s delay to some requests. The delay helps the requests to
// hit the race condition from github issue #1047 more easily.
use rand::Rng;
if rand::thread_rng().gen::<u8>() < 5 {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
*/
let key = rel_block_to_key(req.rel, req.blkno);
if timeline.get_shard_identity().is_key_local(&key) {
self.do_handle_get_page_at_lsn_request(timeline, req, ctx)
.await
let page = if timeline.get_shard_identity().is_key_local(&key) {
timeline
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
.await?
} else {
// The Tenant shard we looked up at connection start does not hold this particular
// key: look for other shards in this tenant. This scenario occurs if a pageserver
@@ -927,30 +836,30 @@ impl PageServerHandler {
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key);
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
// the requested page is not present on this node.
// TODO: this should be some kind of structured error that the client will understand,
// so that it can block until its config is updated: this error is expected in the case
// that the Tenant's shards' placements are being updated and the client hasn't been
// informed yet.
//
// https://github.com/neondatabase/neon/issues/6038
return Err(anyhow::anyhow!("Request routed to wrong shard"));
}
Err(e) => return Err(e.into()),
};
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
// the GateGuard was already held over the whole connection.
let _timeline_guard = timeline
.gate
.enter()
.map_err(|_| PageStreamError::Shutdown)?;
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
timeline
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
.await?
};
self.do_handle_get_page_at_lsn_request(&timeline, req, ctx)
.await
}
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
}
#[allow(clippy::too_many_arguments)]
@@ -1091,7 +1000,9 @@ impl PageServerHandler {
)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant.get_timeline(timeline_id, true)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
Ok(timeline)
}
}
@@ -1513,15 +1424,14 @@ enum GetActiveTimelineError {
#[error(transparent)]
Tenant(GetActiveTenantError),
#[error(transparent)]
Timeline(#[from] GetTimelineError),
Timeline(anyhow::Error),
}
impl From<GetActiveTimelineError> for QueryError {
fn from(e: GetActiveTimelineError) -> Self {
match e {
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
GetActiveTimelineError::Tenant(e) => e.into(),
GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
}
}
}

View File

@@ -11,7 +11,7 @@ use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context};
use anyhow::Context;
use bytes::{Buf, Bytes};
use pageserver_api::key::is_rel_block_key;
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -147,7 +147,6 @@ impl Timeline {
{
DatadirModification {
tline: self,
pending_lsns: Vec::new(),
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
@@ -160,11 +159,11 @@ impl Timeline {
//------------------------------------------------------------------------------
/// Look up given page version.
pub(crate) async fn get_rel_page_at_lsn(
pub async fn get_rel_page_at_lsn(
&self,
tag: RelTag,
blknum: BlockNumber,
version: Version<'_>,
lsn: Lsn,
latest: bool,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
@@ -174,47 +173,44 @@ impl Timeline {
));
}
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
tag, blknum, lsn, nblocks
);
return Ok(ZERO_PAGE.clone());
}
let key = rel_block_to_key(tag, blknum);
version.get(self, key, ctx).await
self.get(key, lsn, ctx).await
}
// Get size of a database in blocks
pub(crate) async fn get_db_size(
pub async fn get_db_size(
&self,
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
lsn: Lsn,
latest: bool,
ctx: &RequestContext,
) -> Result<usize, PageReconstructError> {
let mut total_blocks = 0;
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
let rels = self.list_rels(spcnode, dbnode, lsn, ctx).await?;
for rel in rels {
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
let n_blocks = self.get_rel_size(rel, lsn, latest, ctx).await?;
total_blocks += n_blocks as usize;
}
Ok(total_blocks)
}
/// Get size of a relation file
pub(crate) async fn get_rel_size(
pub async fn get_rel_size(
&self,
tag: RelTag,
version: Version<'_>,
lsn: Lsn,
latest: bool,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
@@ -224,12 +220,12 @@ impl Timeline {
));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
return Ok(nblocks);
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, version, latest, ctx).await?
&& !self.get_rel_exists(tag, lsn, latest, ctx).await?
{
// FIXME: Postgres sometimes calls smgrcreate() to create
// FSM, and smgrnblocks() on it immediately afterwards,
@@ -239,7 +235,7 @@ impl Timeline {
}
let key = rel_size_to_key(tag);
let mut buf = version.get(self, key, ctx).await?;
let mut buf = self.get(key, lsn, ctx).await?;
let nblocks = buf.get_u32_le();
if latest {
@@ -250,16 +246,16 @@ impl Timeline {
// latest=true, then it can not cause cache corruption, because with latest=true
// pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
// associated with most recent value of LSN.
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
self.update_cached_rel_size(tag, lsn, nblocks);
}
Ok(nblocks)
}
/// Does relation exist?
pub(crate) async fn get_rel_exists(
pub async fn get_rel_exists(
&self,
tag: RelTag,
version: Version<'_>,
lsn: Lsn,
_latest: bool,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
@@ -270,12 +266,12 @@ impl Timeline {
}
// first try to lookup relation in cache
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
return Ok(true);
}
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = version.get(self, key, ctx).await?;
let buf = self.get(key, lsn, ctx).await?;
match RelDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
@@ -291,16 +287,16 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub(crate) async fn list_rels(
pub async fn list_rels(
&self,
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
let buf = self.get(key, lsn, ctx).await?;
match RelDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
@@ -319,7 +315,7 @@ impl Timeline {
}
/// Look up given SLRU page version.
pub(crate) async fn get_slru_page_at_lsn(
pub async fn get_slru_page_at_lsn(
&self,
kind: SlruKind,
segno: u32,
@@ -332,29 +328,29 @@ impl Timeline {
}
/// Get size of an SLRU segment
pub(crate) async fn get_slru_segment_size(
pub async fn get_slru_segment_size(
&self,
kind: SlruKind,
segno: u32,
version: Version<'_>,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
let key = slru_segment_size_to_key(kind, segno);
let mut buf = version.get(self, key, ctx).await?;
let mut buf = self.get(key, lsn, ctx).await?;
Ok(buf.get_u32_le())
}
/// Get size of an SLRU segment
pub(crate) async fn get_slru_segment_exists(
pub async fn get_slru_segment_exists(
&self,
kind: SlruKind,
segno: u32,
version: Version<'_>,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = version.get(self, key, ctx).await?;
let buf = self.get(key, lsn, ctx).await?;
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
@@ -372,7 +368,7 @@ impl Timeline {
/// so it's not well defined which LSN you get if there were multiple commits
/// "in flight" at that point in time.
///
pub(crate) async fn find_lsn_for_timestamp(
pub async fn find_lsn_for_timestamp(
&self,
search_timestamp: TimestampTz,
cancel: &CancellationToken,
@@ -452,7 +448,7 @@ impl Timeline {
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
/// with a smaller/larger timestamp.
///
pub(crate) async fn is_latest_commit_timestamp_ge_than(
pub async fn is_latest_commit_timestamp_ge_than(
&self,
search_timestamp: TimestampTz,
probe_lsn: Lsn,
@@ -475,7 +471,7 @@ impl Timeline {
/// Obtain the possible timestamp range for the given lsn.
///
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
pub(crate) async fn get_timestamp_for_lsn(
pub async fn get_timestamp_for_lsn(
&self,
probe_lsn: Lsn,
ctx: &RequestContext,
@@ -505,11 +501,11 @@ impl Timeline {
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
) -> Result<T, PageReconstructError> {
for segno in self
.list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
.list_slru_segments(SlruKind::Clog, probe_lsn, ctx)
.await?
{
let nblocks = self
.get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn, ctx)
.await?;
for blknum in (0..nblocks).rev() {
let clog_page = self
@@ -532,36 +528,36 @@ impl Timeline {
}
/// Get a list of SLRU segments
pub(crate) async fn list_slru_segments(
pub async fn list_slru_segments(
&self,
kind: SlruKind,
version: Version<'_>,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashSet<u32>, PageReconstructError> {
// fetch directory entry
let key = slru_dir_to_key(kind);
let buf = version.get(self, key, ctx).await?;
let buf = self.get(key, lsn, ctx).await?;
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => Ok(dir.segments),
Err(e) => Err(PageReconstructError::from(e)),
}
}
pub(crate) async fn get_relmap_file(
pub async fn get_relmap_file(
&self,
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
let key = relmap_file_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
let buf = self.get(key, lsn, ctx).await?;
Ok(buf)
}
pub(crate) async fn list_dbdirs(
pub async fn list_dbdirs(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -575,7 +571,7 @@ impl Timeline {
}
}
pub(crate) async fn get_twophase_file(
pub async fn get_twophase_file(
&self,
xid: TransactionId,
lsn: Lsn,
@@ -586,7 +582,7 @@ impl Timeline {
Ok(buf)
}
pub(crate) async fn list_twophase_files(
pub async fn list_twophase_files(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -600,7 +596,7 @@ impl Timeline {
}
}
pub(crate) async fn get_control_file(
pub async fn get_control_file(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -608,7 +604,7 @@ impl Timeline {
self.get(CONTROLFILE_KEY, lsn, ctx).await
}
pub(crate) async fn get_checkpoint(
pub async fn get_checkpoint(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -616,7 +612,7 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
pub(crate) async fn list_aux_files(
pub async fn list_aux_files(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -656,10 +652,7 @@ impl Timeline {
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self
.list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
.await?
{
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
if self.cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}
@@ -699,7 +692,7 @@ impl Timeline {
result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
.list_rels(spcnode, dbnode, lsn, ctx)
.await?
.into_iter()
.collect();
@@ -806,39 +799,18 @@ pub struct DatadirModification<'a> {
/// in the state in 'tline' yet.
pub tline: &'a Timeline,
/// Current LSN of the modification
lsn: Lsn,
/// Lsn assigned by begin_modification
pub lsn: Lsn,
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_lsns: Vec<Lsn>,
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_updates: HashMap<Key, Value>,
pending_deletions: Vec<Range<Key>>,
pending_nblocks: i64,
}
impl<'a> DatadirModification<'a> {
/// Get the current lsn
pub(crate) fn get_lsn(&self) -> Lsn {
self.lsn
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
lsn >= self.lsn,
"setting an older lsn {} than {} is not allowed",
lsn,
self.lsn
);
if lsn > self.lsn {
self.pending_lsns.push(self.lsn);
self.lsn = lsn;
}
Ok(())
}
/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
@@ -1012,9 +984,11 @@ impl<'a> DatadirModification<'a> {
dbnode: Oid,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let req_lsn = self.tline.get_last_record_lsn();
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
.get_db_size(spcnode, dbnode, req_lsn, true, ctx)
.await?;
// Remove entry from dbdir
@@ -1103,11 +1077,8 @@ impl<'a> DatadirModification<'a> {
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
.tline
.get_rel_exists(rel, Version::Modified(self), true, ctx)
.await?
{
let last_lsn = self.tline.get_last_record_lsn();
if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
let size_key = rel_size_to_key(rel);
// Fetch the old size first
let old_size = self.get(size_key, ctx).await?.get_u32_le();
@@ -1352,23 +1323,17 @@ impl<'a> DatadirModification<'a> {
let writer = self.tline.writer().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
for (key, values) in self.pending_updates.drain() {
for (lsn, value) in values {
if is_rel_block_key(&key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, lsn, &value, ctx).await?;
} else {
retained_pending_updates
.entry(key)
.or_default()
.push((lsn, value));
}
let mut retained_pending_updates = HashMap::new();
for (key, value) in self.pending_updates.drain() {
if is_rel_block_key(&key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, self.lsn, &value, ctx).await?;
} else {
retained_pending_updates.insert(key, value);
}
}
self.pending_updates = retained_pending_updates;
self.pending_updates.extend(retained_pending_updates);
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
@@ -1385,28 +1350,18 @@ impl<'a> DatadirModification<'a> {
///
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
if !self.pending_updates.is_empty() {
writer.put_batch(&self.pending_updates, ctx).await?;
self.pending_updates.clear();
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value, ctx).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn).await?;
}
if !self.pending_deletions.is_empty() {
writer.delete_batch(&self.pending_deletions).await?;
self.pending_deletions.clear();
}
self.pending_lsns.push(self.lsn);
for pending_lsn in self.pending_lsns.drain(..) {
// Ideally, we should be able to call writer.finish_write() only once
// with the highest LSN. However, the last_record_lsn variable in the
// timeline keeps track of the latest LSN and the immediate previous LSN
// so we need to record every LSN to not leave a gap between them.
writer.finish_write(pending_lsn);
}
writer.finish_write(lsn);
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
@@ -1415,86 +1370,44 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub(crate) fn len(&self) -> usize {
self.pending_updates.len() + self.pending_deletions.len()
pub(crate) fn is_empty(&self) -> bool {
self.pending_updates.is_empty() && self.pending_deletions.is_empty()
}
// Internal helper functions to batch the modifications
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
// Have we already updated the same key? Read the latest pending updated
// Have we already updated the same key? Read the pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::from(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};
if let Some(value) = self.pending_updates.get(&key) {
if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::from(anyhow::anyhow!(
"unexpected pending WAL record"
)))
}
} else {
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn, ctx).await
}
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn, ctx).await
}
fn put(&mut self, key: Key, val: Value) {
let values = self.pending_updates.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
if let Some((last_lsn, last_value)) = values.last_mut() {
if *last_lsn == self.lsn {
*last_value = val;
return;
}
}
values.push((self.lsn, val));
self.pending_updates.insert(key, val);
}
fn delete(&mut self, key_range: Range<Key>) {
trace!("DELETE {}-{}", key_range.start, key_range.end);
self.pending_deletions.push((key_range, self.lsn));
}
}
/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
/// need to look up the keys in the modification first before looking them up in the
/// timeline to not miss the latest updates.
#[derive(Clone, Copy)]
pub enum Version<'a> {
Lsn(Lsn),
Modified(&'a DatadirModification<'a>),
}
impl<'a> Version<'a> {
async fn get(
&self,
timeline: &Timeline,
key: Key,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
match self {
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
Version::Modified(modification) => modification.get(key, ctx).await,
}
}
fn get_lsn(&self) -> Lsn {
match self {
Version::Lsn(lsn) => *lsn,
Version::Modified(modification) => modification.lsn,
}
self.pending_deletions.push(key_range);
}
}

View File

@@ -147,7 +147,7 @@ pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(||
// else, but that has not been needed in a long time.
std::env::var("TOKIO_WORKER_THREADS")
.map(|s| s.parse::<usize>().unwrap())
.unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
.unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
});
#[derive(Debug, Clone, Copy)]

View File

@@ -33,7 +33,6 @@ use tracing::*;
use utils::backoff;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::failpoint_support;
use utils::fs_ext;
use utils::sync::gate::Gate;
use utils::sync::gate::GateGuard;
@@ -56,7 +55,6 @@ use self::timeline::uninit::TimelineUninitMark;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::TimelineResources;
use self::timeline::WaitLsnError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
@@ -596,9 +594,10 @@ impl Tenant {
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO(sharding): make WalRedoManager shard-aware
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
tenant_shard_id.tenant_id,
)));
let TenantSharedResources {
@@ -891,7 +890,7 @@ impl Tenant {
) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
failpoint_support::sleep_millis_async!("before-attaching-tenant");
crate::failpoint_support::sleep_millis_async!("before-attaching-tenant");
let preload = match preload {
Some(p) => p,
@@ -1003,7 +1002,7 @@ impl Tenant {
// IndexPart is the source of truth.
self.clean_up_timelines(&existent_timelines)?;
failpoint_support::sleep_millis_async!("attach-before-activate");
crate::failpoint_support::sleep_millis_async!("attach-before-activate");
info!("Done");
@@ -1145,9 +1144,10 @@ impl Tenant {
tenant_shard_id: TenantShardId,
reason: String,
) -> Arc<Tenant> {
// TODO(sharding): make WalRedoManager shard-aware
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
tenant_shard_id.tenant_id,
)));
Arc::new(Tenant::new(
TenantState::Broken {
@@ -1759,15 +1759,7 @@ impl Tenant {
// decoding the new WAL might need to look up previous pages, relation
// sizes etc. and that would get confused if the previous page versions
// are not in the repository yet.
ancestor_timeline
.wait_lsn(*lsn, ctx)
.await
.map_err(|e| match e {
e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
}
WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
})?;
ancestor_timeline.wait_lsn(*lsn, ctx).await?;
}
self.branch_timeline(
@@ -2847,7 +2839,9 @@ impl Tenant {
}
};
failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
crate::failpoint_support::sleep_millis_async!(
"gc_iteration_internal_after_getting_gc_timelines"
);
// If there is nothing to GC, we don't want any messages in the INFO log.
if !gc_timelines.is_empty() {

View File

@@ -46,8 +46,6 @@ pub mod defaults {
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -130,7 +130,7 @@ impl TenantsMap {
/// A page service client sends a TenantId, and to look up the correct Tenant we must
/// resolve this to a fully qualified TenantShardId.
fn resolve_attached_shard(
fn resolve_shard(
&self,
tenant_id: &TenantId,
selector: ShardSelector,
@@ -140,27 +140,25 @@ impl TenantsMap {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
// Ignore all slots that don't contain an attached tenant
let tenant = match &slot.1 {
TenantSlot::Attached(t) => t,
_ => continue,
};
match selector {
ShardSelector::First => return Some(*slot.0),
ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
return Some(*slot.0)
}
ShardSelector::Page(key) => {
// First slot we see for this tenant, calculate the expected shard number
// for the key: we will use this for checking if this and subsequent
// slots contain the key, rather than recalculating the hash each time.
if want_shard.is_none() {
want_shard = Some(tenant.shard_identity.get_shard_number(&key));
}
if let Some(tenant) = slot.1.get_attached() {
// First slot we see for this tenant, calculate the expected shard number
// for the key: we will use this for checking if this and subsequent
// slots contain the key, rather than recalculating the hash each time.
if want_shard.is_none() {
want_shard = Some(tenant.shard_identity.get_shard_number(&key));
}
if Some(tenant.shard_identity.number) == want_shard {
return Some(*slot.0);
if Some(tenant.shard_identity.number) == want_shard {
return Some(*slot.0);
}
} else {
continue;
}
}
_ => continue,
@@ -1259,11 +1257,9 @@ pub(crate) async fn get_active_tenant_with_timeout(
let locked = TENANTS.read().unwrap();
// Resolve TenantId to TenantShardId
let tenant_shard_id = locked
.resolve_attached_shard(&tenant_id, shard_selector)
.ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
tenant_id,
)))?;
let tenant_shard_id = locked.resolve_shard(&tenant_id, shard_selector).ok_or(
GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id)),
)?;
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;

View File

@@ -818,25 +818,8 @@ impl RemoteTimelineClient {
fn schedule_deletion_of_unlinked0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
mut with_metadata: Vec<(LayerFileName, LayerFileMetadata)>,
with_metadata: Vec<(LayerFileName, LayerFileMetadata)>,
) {
// Filter out any layers which were not created by this tenant shard. These are
// layers that originate from some ancestor shard after a split, and may still
// be referenced by other shards. We are free to delete them locally and remove
// them from our index (and would have already done so when we reach this point
// in the code), but we may not delete them remotely.
with_metadata.retain(|(name, meta)| {
let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
&& meta.shard.shard_count == self.tenant_shard_id.shard_count;
if !retain {
tracing::debug!(
"Skipping deletion of ancestor-shard layer {name}, from shard {}",
meta.shard
);
}
retain
});
for (name, meta) in &with_metadata {
info!(
"scheduling deletion of layer {}{} (shard {})",

View File

@@ -23,7 +23,7 @@ use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::ops::Range;
use tokio::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::RwLock;
use super::{DeltaLayerWriter, ResidentLayer};
@@ -246,43 +246,16 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub(crate) async fn put_value(
pub async fn put_value(
&self,
key: Key,
lsn: Lsn,
val: &Value,
ctx: &RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
self.put_value_locked(&mut inner, key, lsn, val, ctx).await
}
pub(crate) async fn put_values(
&self,
values: &HashMap<Key, Vec<(Lsn, Value)>>,
ctx: &RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
for (key, vals) in values {
for (lsn, val) in vals {
self.put_value_locked(&mut inner, *key, *lsn, val, ctx)
.await?;
}
}
Ok(())
}
async fn put_value_locked(
&self,
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
key: Key,
lsn: Lsn,
val: &Value,
ctx: &RequestContext,
) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let inner: &mut _ = &mut *self.inner.write().await;
self.assert_writable();
let off = {
// Avoid doing allocations for "small" values.
@@ -291,7 +264,7 @@ impl InMemoryLayer {
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
buf.clear();
val.ser_into(&mut buf)?;
locked_inner
inner
.file
.write_blob(
&buf,
@@ -302,7 +275,7 @@ impl InMemoryLayer {
.await?
};
let vec_map = locked_inner.index.entry(key).or_default();
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -312,11 +285,13 @@ impl InMemoryLayer {
Ok(())
}
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
// TODO: Currently, we just leak the storage for any deleted keys
Ok(())
}
/// Make the layer non-writeable. Only call once.
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
pub async fn freeze(&self, end_lsn: Lsn) {

View File

@@ -373,20 +373,15 @@ pub struct GcInfo {
}
/// An error happened in a get() operation.
#[derive(thiserror::Error, Debug)]
pub(crate) enum PageReconstructError {
#[derive(thiserror::Error)]
pub enum PageReconstructError {
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error("Ancestor LSN wait error: {0}")]
AncestorLsnTimeout(#[from] WaitLsnError),
/// The operation was cancelled
#[error("Cancelled")]
Cancelled,
/// The ancestor of this is being stopped
#[error("ancestor timeline {0} is being stopped")]
AncestorStopping(TimelineId),
/// An error happened replaying WAL records
@@ -407,6 +402,32 @@ enum FlushLayerError {
Other(#[from] anyhow::Error),
}
impl std::fmt::Debug for PageReconstructError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Other(err) => err.fmt(f),
Self::Cancelled => write!(f, "cancelled"),
Self::AncestorStopping(timeline_id) => {
write!(f, "ancestor timeline {timeline_id} is being stopped")
}
Self::WalRedo(err) => err.fmt(f),
}
}
}
impl std::fmt::Display for PageReconstructError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Other(err) => err.fmt(f),
Self::Cancelled => write!(f, "cancelled"),
Self::AncestorStopping(timeline_id) => {
write!(f, "ancestor timeline {timeline_id} is being stopped")
}
Self::WalRedo(err) => err.fmt(f),
}
}
}
#[derive(Clone, Copy)]
pub enum LogicalSizeCalculationCause {
Initial,
@@ -431,21 +452,6 @@ impl std::fmt::Debug for Timeline {
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum WaitLsnError {
// Called on a timeline which is shutting down
#[error("Shutdown")]
Shutdown,
// Called on an timeline not in active state or shutting down
#[error("Bad state (not active)")]
BadState,
// Timeout expired while waiting for LSN to catch up with goal.
#[error("{0}")]
Timeout(String),
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -480,7 +486,7 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub(crate) async fn get(
pub async fn get(
&self,
key: Key,
lsn: Lsn,
@@ -490,11 +496,6 @@ impl Timeline {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
}
// This check is debug-only because of the cost of hashing, and because it's a double-check: we
// already checked the key against the shard_identity when looking up the Timeline from
// page_service.
debug_assert!(!self.shard_identity.is_key_disposable(&key));
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
@@ -628,28 +629,24 @@ impl Timeline {
/// You should call this before any of the other get_* or list_* functions. Calling
/// those functions with an LSN that has been processed yet is an error.
///
pub(crate) async fn wait_lsn(
pub async fn wait_lsn(
&self,
lsn: Lsn,
_ctx: &RequestContext, /* Prepare for use by cancellation */
) -> Result<(), WaitLsnError> {
if self.cancel.is_cancelled() {
return Err(WaitLsnError::Shutdown);
} else if !self.is_active() {
return Err(WaitLsnError::BadState);
}
) -> anyhow::Result<()> {
anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
// This should never be called from the WAL receiver, because that could lead
// to a deadlock.
debug_assert!(
anyhow::ensure!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
"wait_lsn cannot be called in WAL receiver"
);
debug_assert!(
anyhow::ensure!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
"wait_lsn cannot be called in WAL receiver"
);
debug_assert!(
anyhow::ensure!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
"wait_lsn cannot be called in WAL receiver"
);
@@ -663,22 +660,18 @@ impl Timeline {
{
Ok(()) => Ok(()),
Err(e) => {
use utils::seqwait::SeqWaitError::*;
match e {
Shutdown => Err(WaitLsnError::Shutdown),
Timeout => {
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
drop(_timer);
let walreceiver_status = self.walreceiver_status();
Err(WaitLsnError::Timeout(format!(
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
drop(_timer);
let walreceiver_status = self.walreceiver_status();
Err(anyhow::Error::new(e).context({
format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
lsn,
self.get_last_record_lsn(),
self.get_disk_consistent_lsn(),
walreceiver_status,
)))
}
}
)
}))
}
}
}
@@ -1466,7 +1459,6 @@ impl Timeline {
max_lsn_wal_lag,
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
ingest_batch_size: self.conf.ingest_batch_size,
},
broker_client,
ctx,
@@ -2231,13 +2223,13 @@ impl Timeline {
return Err(layer_traversal_error(
if cfg!(test) {
format!(
"could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
"could not find data for key {} at LSN {}, for request at LSN {}\n{}",
key, cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
)
} else {
format!(
"could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
"could not find data for key {} at LSN {}, for request at LSN {}",
key, cont_lsn, request_lsn
)
},
traversal_path,
@@ -2297,12 +2289,11 @@ impl Timeline {
ancestor
.wait_lsn(timeline.ancestor_lsn, ctx)
.await
.map_err(|e| match e {
e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e),
WaitLsnError::Shutdown => PageReconstructError::Cancelled,
e @ WaitLsnError::BadState => {
PageReconstructError::Other(anyhow::anyhow!(e))
}
.with_context(|| {
format!(
"wait for lsn {} on ancestor timeline_id={}",
timeline.ancestor_lsn, ancestor.timeline_id
)
})?;
timeline_owned = ancestor;
@@ -2480,27 +2471,9 @@ impl Timeline {
Ok(())
}
async fn put_values(
&self,
values: &HashMap<Key, Vec<(Lsn, Value)>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Pick the first LSN in the batch to get the layer to write to.
for lsns in values.values() {
if let Some((lsn, _)) = lsns.first() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_values(values, ctx).await?;
break;
}
}
Ok(())
}
async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
if let Some((_, lsn)) = tombstones.first() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_tombstones(tombstones).await?;
}
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn).await?;
layer.put_tombstone(key_range, lsn).await?;
Ok(())
}
@@ -3062,15 +3035,6 @@ impl Timeline {
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
key = key.next();
continue;
}
let img = match self.get(key, lsn, ctx).await {
Ok(img) => img,
Err(err) => {
@@ -3097,7 +3061,6 @@ impl Timeline {
}
}
};
image_layer_writer.put_image(key, &img).await?;
key = key.next();
}
@@ -3668,15 +3631,7 @@ impl Timeline {
)))
});
if !self.shard_identity.is_key_disposable(&key) {
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
} else {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
}
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
if !new_layers.is_empty() {
fail_point!("after-timeline-compacted-first-L1");
@@ -4231,7 +4186,7 @@ impl Timeline {
.context("Failed to reconstruct a page image:")
{
Ok(img) => img,
Err(e) => return Err(PageReconstructError::WalRedo(e)),
Err(e) => return Err(PageReconstructError::from(e)),
};
if img.len() == page_cache::PAGE_SZ {
@@ -4574,16 +4529,8 @@ impl<'a> TimelineWriter<'a> {
self.tl.put_value(key, lsn, value, ctx).await
}
pub(crate) async fn put_batch(
&self,
batch: &HashMap<Key, Vec<(Lsn, Value)>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.tl.put_values(batch, ctx).await
}
pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
self.tl.put_tombstones(batch).await
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn).await
}
/// Track the end of the latest digested WAL record.
@@ -4594,11 +4541,11 @@ impl<'a> TimelineWriter<'a> {
/// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
/// the 'lsn' or anything older. The previous last record LSN is stored alongside
/// the latest and can be read.
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
pub fn finish_write(&self, new_lsn: Lsn) {
self.tl.finish_write(new_lsn);
}
pub(crate) fn update_current_logical_size(&self, delta: i64) {
pub fn update_current_logical_size(&self, delta: i64) {
self.tl.update_current_logical_size(delta)
}
}

View File

@@ -58,7 +58,6 @@ pub struct WalReceiverConf {
pub max_lsn_wal_lag: NonZeroU64,
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
pub ingest_batch_size: u64,
}
pub struct WalReceiver {

View File

@@ -411,7 +411,6 @@ impl ConnectionManagerState {
let node_id = new_sk.safekeeper_id;
let connect_timeout = self.conf.wal_connect_timeout;
let ingest_batch_size = self.conf.ingest_batch_size;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
@@ -431,7 +430,6 @@ impl ConnectionManagerState {
connect_timeout,
ctx,
node_id,
ingest_batch_size,
)
.await;
@@ -1347,7 +1345,6 @@ mod tests {
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
auth_token: None,
availability_zone: None,
ingest_batch_size: 1,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),

View File

@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS},
task_mgr,
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
@@ -106,7 +106,6 @@ impl From<WalDecodeError> for WalReceiverError {
/// Open a connection to the given safekeeper and receive WAL, sending back progress
/// messages as we go.
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
wal_source_connconf: PgConnectionConfig,
@@ -115,7 +114,6 @@ pub(super) async fn handle_walreceiver_connection(
connect_timeout: Duration,
ctx: RequestContext,
node: NodeId,
ingest_batch_size: u64,
) -> Result<(), WalReceiverError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -307,9 +305,7 @@ pub(super) async fn handle_walreceiver_connection(
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(startlsn);
let mut uncommitted_records = 0;
let mut filtered_records = 0;
let mut modification = timeline.begin_modification(endlsn);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
@@ -318,40 +314,14 @@ pub(super) async fn handle_walreceiver_connection(
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
}
// Ingest the records without immediately committing them.
let ingested = walingest
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
if !ingested {
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
WAL_INGEST.records_filtered.inc();
filtered_records += 1;
}
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
// Commit every ingest_batch_size records. Even if we filtered out
// all records, we still need to call commit to advance the LSN.
uncommitted_records += 1;
if uncommitted_records >= ingest_batch_size {
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);
modification.commit(&ctx).await?;
uncommitted_records = 0;
filtered_records = 0;
}
}
// Commit the remaining records.
if uncommitted_records > 0 {
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);
modification.commit(&ctx).await?;
}
}

View File

@@ -29,7 +29,6 @@ use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::{bail, Context, Result};
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use utils::failpoint_support;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
@@ -48,18 +47,20 @@ use postgres_ffi::TransactionId;
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
pub struct WalIngest {
pub struct WalIngest<'a> {
shard: ShardIdentity,
timeline: &'a Timeline,
checkpoint: CheckPoint,
checkpoint_modified: bool,
}
impl WalIngest {
impl<'a> WalIngest<'a> {
pub async fn new(
timeline: &Timeline,
timeline: &'a Timeline,
startpoint: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
ctx: &'_ RequestContext,
) -> anyhow::Result<WalIngest<'a>> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
@@ -68,6 +69,7 @@ impl WalIngest {
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
timeline,
checkpoint,
checkpoint_modified: false,
})
@@ -81,8 +83,6 @@ impl WalIngest {
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
/// relations/pages that the record affects.
///
/// This function returns `true` if the record was ingested, and `false` if it was filtered out
///
pub async fn ingest_record(
&mut self,
recdata: Bytes,
@@ -90,13 +90,11 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
) -> anyhow::Result<bool> {
) -> anyhow::Result<()> {
WAL_INGEST.records_received.inc();
let pg_version = modification.tline.pg_version;
let prev_len = modification.len();
modification.set_lsn(lsn)?;
decode_wal_record(recdata, decoded, pg_version)?;
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -133,9 +131,9 @@ impl WalIngest {
}
pg_constants::RM_DBASE_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
debug!(%info, %pg_version, "handle RM_DBASE_ID");
debug!(%info, pg_version=%self.timeline.pg_version, "handle RM_DBASE_ID");
if pg_version == 14 {
if self.timeline.pg_version == 14 {
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
let createdb = XlCreateDatabase::decode(&mut buf);
debug!("XLOG_DBASE_CREATE v14");
@@ -151,7 +149,7 @@ impl WalIngest {
.await?;
}
}
} else if pg_version == 15 {
} else if self.timeline.pg_version == 15 {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
@@ -171,7 +169,7 @@ impl WalIngest {
.await?;
}
}
} else if pg_version == 16 {
} else if self.timeline.pg_version == 16 {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
@@ -346,7 +344,9 @@ impl WalIngest {
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
crate::failpoint_support::sleep_millis_async!(
"wal-ingest-logical-message-sleep"
);
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
}
@@ -400,11 +400,19 @@ impl WalIngest {
self.checkpoint_modified = false;
}
// Note that at this point this record is only cached in the modification
// until commit() is called to flush the data into the repository and update
// the latest LSN.
if modification.is_empty() {
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
WAL_INGEST.records_filtered.inc();
modification.tline.finish_write(lsn);
} else {
WAL_INGEST.records_committed.inc();
modification.commit(ctx).await?;
}
Ok(modification.len() > prev_len)
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN.
Ok(())
}
/// Do not store this block, but observe it for the purposes of updating our relation size state.
@@ -451,7 +459,7 @@ impl WalIngest {
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
// do not materialize null pages because them most likely be soon replaced with real data
&& blk.bimg_len != 0
{
@@ -504,7 +512,7 @@ impl WalIngest {
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
match modification.tline.pg_version {
match self.timeline.pg_version {
14 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -718,95 +726,75 @@ impl WalIngest {
relnode: decoded.blocks[0].rnode_relnode,
};
self.clear_visibility_map_bits(
modification,
vm_rel,
new_heap_blkno,
old_heap_blkno,
flags,
ctx).await?;
}
Ok(())
}
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
/// Write NeonWalRecord::ClearVisibilityMapFlags records for clearing the VM bits
/// corresponding to new_heap_blkno and old_heap_blkno.
async fn clear_visibility_map_bits(
&mut self,
modification: &mut DatadirModification<'_>,
vm_rel: RelTag,
new_heap_blkno: Option<u32>,
old_heap_blkno: Option<u32>,
flags: u8,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Determine the VM pages containing the old and the new heap block.
//
// Sometimes, Postgres seems to create heap WAL records with the
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
// not set. In fact, it's possible that the VM page does not exist at all.
// In that case, we don't want to store a record to clear the VM bit;
// replaying it would fail to find the previous image of the page, because
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
// record if it doesn't.
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
let heap_to_vm_blk = |heap_blkno| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
if vm_blk >= vm_size {
None
} else {
Some(vm_blk)
// Sometimes, Postgres seems to create heap WAL records with the
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
// not set. In fact, it's possible that the VM page does not exist at all.
// In that case, we don't want to store a record to clear the VM bit;
// replaying it would fail to find the previous image of the page, because
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
// record if it doesn't.
let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
if let Some(blknum) = new_vm_blk {
if blknum >= vm_size {
new_vm_blk = None;
}
}
if let Some(blknum) = old_vm_blk {
if blknum >= vm_size {
old_vm_blk = None;
}
}
};
let new_vm_blk = new_heap_blkno.map(heap_to_vm_blk).flatten();
let old_vm_blk = old_heap_blkno.map(heap_to_vm_blk).flatten();
if new_vm_blk.is_some() || old_vm_blk.is_some() {
if new_vm_blk == old_vm_blk {
// An UPDATE record that needs to clear the bits for both old and the
// new page, both of which reside on the same VM page.
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk.unwrap(),
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
} else {
// Clear VM bits for one heap page, or for two pages that reside on
// different VM pages.
if let Some(new_vm_blk) = new_vm_blk {
if new_vm_blk.is_some() || old_vm_blk.is_some() {
if new_vm_blk == old_vm_blk {
// An UPDATE record that needs to clear the bits for both old and the
// new page, both of which reside on the same VM page.
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk,
new_vm_blk.unwrap(),
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags,
},
ctx,
)
.await?;
}
if let Some(old_vm_blk) = old_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
old_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
} else {
// Clear VM bits for one heap page, or for two pages that reside on
// different VM pages.
if let Some(new_vm_blk) = new_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags,
},
ctx,
)
.await?;
}
if let Some(old_vm_blk) = old_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
old_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags,
},
ctx,
)
.await?;
}
}
}
}
@@ -829,11 +817,10 @@ impl WalIngest {
let mut new_heap_blkno: Option<u32> = None;
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
let pg_version = modification.tline.pg_version;
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match pg_version {
match self.timeline.pg_version {
16 => {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -896,7 +883,7 @@ impl WalIngest {
}
_ => bail!(
"Neon RMGR has no known compatibility with PostgreSQL version {}",
pg_version
self.timeline.pg_version
),
}
@@ -919,7 +906,7 @@ impl WalIngest {
// replaying it would fail to find the previous image of the page, because
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
// record if it doesn't.
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
if let Some(blknum) = new_vm_blk {
if blknum >= vm_size {
new_vm_blk = None;
@@ -997,14 +984,16 @@ impl WalIngest {
let src_db_id = rec.src_db_id;
let src_tablespace_id = rec.src_tablespace_id;
// Creating a database is implemented by copying the template (aka. source) database.
// To copy all the relations, we need to ask for the state as of the same LSN, but we
// cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for
// the last valid LSN to advance up to it. So we use the previous record's LSN in the
// get calls instead.
let req_lsn = modification.tline.get_last_record_lsn();
let rels = modification
.tline
.list_rels(
src_tablespace_id,
src_db_id,
Version::Modified(modification),
ctx,
)
.list_rels(src_tablespace_id, src_db_id, req_lsn, ctx)
.await?;
debug!("ingest_xlog_dbase_create: {} rels", rels.len());
@@ -1012,12 +1001,7 @@ impl WalIngest {
// Copy relfilemap
let filemap = modification
.tline
.get_relmap_file(
src_tablespace_id,
src_db_id,
Version::Modified(modification),
ctx,
)
.get_relmap_file(src_tablespace_id, src_db_id, req_lsn, ctx)
.await?;
modification
.put_relmap_file(tablespace_id, db_id, filemap, ctx)
@@ -1031,7 +1015,7 @@ impl WalIngest {
let nblocks = modification
.tline
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
.get_rel_size(src_rel, req_lsn, true, ctx)
.await?;
let dst_rel = RelTag {
spcnode: tablespace_id,
@@ -1049,13 +1033,7 @@ impl WalIngest {
let content = modification
.tline
.get_rel_page_at_lsn(
src_rel,
blknum,
Version::Modified(modification),
true,
ctx,
)
.get_rel_page_at_lsn(src_rel, blknum, req_lsn, true, ctx)
.await?;
modification.put_rel_page_image(dst_rel, blknum, content)?;
num_blocks_copied += 1;
@@ -1126,7 +1104,7 @@ impl WalIngest {
modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
fsm_physical_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
if nblocks > fsm_physical_page_no {
// check if something to do: FSM is larger than truncate position
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
@@ -1148,7 +1126,7 @@ impl WalIngest {
modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
vm_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
if nblocks > vm_page_no {
// check if something to do: VM is larger than truncate position
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
@@ -1221,9 +1199,10 @@ impl WalIngest {
dbnode: xnode.dbnode,
relnode: xnode.relnode,
};
let last_lsn = self.timeline.get_last_record_lsn();
if modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, last_lsn, true, ctx)
.await?
{
self.put_rel_drop(modification, rel, ctx).await?;
@@ -1277,9 +1256,10 @@ impl WalIngest {
// will block waiting for the last valid LSN to advance up to
// it. So we use the previous record's LSN in the get calls
// instead.
let req_lsn = modification.tline.get_last_record_lsn();
for segno in modification
.tline
.list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
.list_slru_segments(SlruKind::Clog, req_lsn, ctx)
.await?
{
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
@@ -1491,6 +1471,20 @@ impl WalIngest {
Ok(())
}
async fn get_relsize(
&mut self,
rel: RelTag,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<BlockNumber> {
let nblocks = if !self.timeline.get_rel_exists(rel, lsn, true, ctx).await? {
0
} else {
self.timeline.get_rel_size(rel, lsn, true, ctx).await?
};
Ok(nblocks)
}
async fn handle_rel_extend(
&mut self,
modification: &mut DatadirModification<'_>,
@@ -1502,6 +1496,7 @@ impl WalIngest {
// Check if the relation exists. We implicitly create relations on first
// record.
// TODO: would be nice if to be more explicit about it
let last_lsn = modification.lsn;
// Get current size and put rel creation if rel doesn't exist
//
@@ -1509,14 +1504,11 @@ impl WalIngest {
// check the cache too. This is because eagerly checking the cache results in
// less work overall and 10% better performance. It's more work on cache miss
// but cache miss is rare.
let old_nblocks = if let Some(nblocks) = modification
.tline
.get_cached_rel_size(&rel, modification.get_lsn())
{
let old_nblocks = if let Some(nblocks) = self.timeline.get_cached_rel_size(&rel, last_lsn) {
nblocks
} else if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
} else if !self
.timeline
.get_rel_exists(rel, last_lsn, true, ctx)
.await?
{
// create it with 0 size initially, the logic below will extend it
@@ -1526,10 +1518,7 @@ impl WalIngest {
.context("Relation Error")?;
0
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.await?
self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?
};
if new_nblocks > old_nblocks {
@@ -1582,9 +1571,10 @@ impl WalIngest {
// Check if the relation exists. We implicitly create relations on first
// record.
// TODO: would be nice if to be more explicit about it
let old_nblocks = if !modification
.tline
.get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
let last_lsn = self.timeline.get_last_record_lsn();
let old_nblocks = if !self
.timeline
.get_slru_segment_exists(kind, segno, last_lsn, ctx)
.await?
{
// create it with 0 size initially, the logic below will extend it
@@ -1593,9 +1583,8 @@ impl WalIngest {
.await?;
0
} else {
modification
.tline
.get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
self.timeline
.get_slru_segment_size(kind, segno, last_lsn, ctx)
.await?
};
@@ -1618,26 +1607,6 @@ impl WalIngest {
}
}
async fn get_relsize(
modification: &DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
) -> anyhow::Result<BlockNumber> {
let nblocks = if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.await?
{
0
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.await?
};
Ok(nblocks)
}
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
@@ -1664,7 +1633,10 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
async fn init_walingest_test<'a>(
tline: &'a Timeline,
ctx: &RequestContext,
) -> Result<WalIngest<'a>> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
@@ -1709,29 +1681,29 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
.await?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
.await?,
3
);
@@ -1739,46 +1711,46 @@ mod tests {
// Check page contents at each LSN
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false, &ctx)
.await?,
TEST_IMG("foo blk 0 at 2")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false, &ctx)
.await?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false, &ctx)
.await?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false, &ctx)
.await?,
TEST_IMG("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false, &ctx)
.await?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false, &ctx)
.await?,
TEST_IMG("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
.await?,
TEST_IMG("foo blk 2 at 5")
);
@@ -1794,19 +1766,19 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false, &ctx)
.await?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false, &ctx)
.await?,
TEST_IMG("foo blk 1 at 4")
);
@@ -1814,13 +1786,13 @@ mod tests {
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
.await?,
3
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
.await?,
TEST_IMG("foo blk 2 at 5")
);
@@ -1833,7 +1805,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
.await?,
0
);
@@ -1846,19 +1818,19 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false, &ctx)
.await?,
ZERO_PAGE
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false, &ctx)
.await?,
TEST_IMG("foo blk 1")
);
@@ -1871,21 +1843,21 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
.await?,
1501
);
for blk in 2..1500 {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false, &ctx)
.await?,
ZERO_PAGE
);
}
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false, &ctx)
.await?,
TEST_IMG("foo blk 1500")
);
@@ -1912,13 +1884,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
.await?,
1
);
@@ -1931,7 +1903,7 @@ mod tests {
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_exists(TESTREL_A, Lsn(0x30), false, &ctx)
.await?,
false
);
@@ -1949,13 +1921,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_exists(TESTREL_A, Lsn(0x40), false, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x40), false, &ctx)
.await?,
1
);
@@ -1988,24 +1960,24 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
.await?,
relsize
);
@@ -2016,7 +1988,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false, &ctx)
.await?,
TEST_IMG(&data)
);
@@ -2033,7 +2005,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
.await?,
1
);
@@ -2043,7 +2015,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false, &ctx)
.await?,
TEST_IMG(&data)
);
@@ -2052,7 +2024,7 @@ mod tests {
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
.await?,
relsize
);
@@ -2061,7 +2033,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false, &ctx)
.await?,
TEST_IMG(&data)
);
@@ -2081,13 +2053,13 @@ mod tests {
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_exists(TESTREL_A, Lsn(0x80), false, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
.await?,
relsize
);
@@ -2097,7 +2069,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false, &ctx)
.await?,
TEST_IMG(&data)
);
@@ -2130,9 +2102,7 @@ mod tests {
assert_current_logical_size(&tline, Lsn(lsn));
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.await?,
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE + 1
);
@@ -2144,9 +2114,7 @@ mod tests {
.await?;
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.await?,
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE
);
assert_current_logical_size(&tline, Lsn(lsn));
@@ -2159,9 +2127,7 @@ mod tests {
.await?;
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.await?,
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE - 1
);
assert_current_logical_size(&tline, Lsn(lsn));
@@ -2177,9 +2143,7 @@ mod tests {
.await?;
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.await?,
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
size as BlockNumber
);
@@ -2216,7 +2180,7 @@ mod tests {
let wal_segment_path = format!("{path}/000000010000000000000001.zst");
let source_initdb_path = format!("{path}/{INITDB_PATH}");
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
let (tenant, ctx) = harness.load().await;
@@ -2258,7 +2222,7 @@ mod tests {
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
.await
.unwrap();
let mut modification = tline.begin_modification(startpoint);
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
@@ -2272,7 +2236,6 @@ mod tests {
.await
.unwrap();
}
modification.commit(&ctx).await.unwrap();
}
let duration = started_at.elapsed();

View File

@@ -22,7 +22,6 @@ use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use std::collections::VecDeque;
use std::io;
@@ -36,11 +35,14 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::Duration;
use std::time::Instant;
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
#[cfg(feature = "testing")]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "testing")]
use pageserver_api::shard::TenantShardId;
use crate::config::PageServerConf;
use crate::metrics::{
WalRedoKillCause, WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_COUNTERS,
@@ -90,7 +92,7 @@ struct ProcessOutput {
/// records.
///
pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
@@ -184,13 +186,10 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
) -> PostgresRedoManager {
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenant_shard_id,
tenant_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: RwLock::new(None),
@@ -245,12 +244,8 @@ impl PostgresRedoManager {
let timer =
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.start_timer();
let proc = Arc::new(
WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
WalRedoProcess::launch(self.conf, self.tenant_id, pg_version)
.context("launch walredo process")?,
);
timer.observe_duration();
*proc_guard = Some(Arc::clone(&proc));
@@ -414,11 +409,7 @@ impl PostgresRedoManager {
"ClearVisibilityMapFlags record on unexpected rel {}",
rel
);
// Helper function to clear the VM bit corresponding to 'heap_blkno'.
// (The logic is similar to the guts of the visibilitymap_clear() function
// in PostgreSQL, after it has locked the right VM page.)
let visibilitymap_clear = |heap_blkno| {
if let Some(heap_blkno) = *new_heap_blkno {
// Calculate the VM block and offset that corresponds to the heap block.
let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
@@ -431,12 +422,19 @@ impl PostgresRedoManager {
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
map[map_byte as usize] &= !(flags << map_offset);
};
if let Some(heap_blkno) = *new_heap_blkno {
visibilitymap_clear(heap_blkno);
}
// Repeat for 'old_heap_blkno', if any
if let Some(heap_blkno) = *old_heap_blkno {
visibilitymap_clear(heap_blkno);
let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
assert!(map_block == blknum);
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
map[map_byte as usize] &= !(flags << map_offset);
}
}
// Non-relational WAL records are handled here, with custom code that has the
@@ -640,7 +638,7 @@ impl<C: CommandExt> CloseFileDescriptors for C {
struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
@@ -654,10 +652,10 @@ impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), pg_version=pg_version))]
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
pg_version: u32,
) -> anyhow::Result<Self> {
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
@@ -682,7 +680,7 @@ impl WalRedoProcess {
// as close-on-exec by default, but that's not enough, since we use
// libraries that directly call libc open without setting that flag.
.close_fds()
.spawn_no_leak_child(tenant_shard_id)
.spawn_no_leak_child(tenant_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
@@ -743,12 +741,12 @@ impl WalRedoProcess {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version))
);
Ok(Self {
conf,
tenant_shard_id,
tenant_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
@@ -774,7 +772,7 @@ impl WalRedoProcess {
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.id()))]
fn apply_wal_records(
&self,
tag: BufferTag,
@@ -968,7 +966,11 @@ impl WalRedoProcess {
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
// TODO(sharding): update this call when WalRedoProcess gets a TenantShardId.
let path = self
.conf
.tenant_path(&TenantShardId::unsharded(self.tenant_id))
.join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
@@ -1002,7 +1004,7 @@ impl Drop for WalRedoProcess {
/// Wrapper type around `std::process::Child` which guarantees that the child
/// will be killed and waited-for by this process before being dropped.
struct NoLeakChild {
tenant_id: TenantShardId,
tenant_id: TenantId,
child: Option<Child>,
}
@@ -1021,7 +1023,7 @@ impl DerefMut for NoLeakChild {
}
impl NoLeakChild {
fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
fn spawn(tenant_id: TenantId, command: &mut Command) -> io::Result<Self> {
let child = command.spawn()?;
Ok(NoLeakChild {
tenant_id,
@@ -1076,7 +1078,7 @@ impl Drop for NoLeakChild {
Some(child) => child,
None => return,
};
let tenant_shard_id = self.tenant_id;
let tenant_id = self.tenant_id;
// Offload the kill+wait of the child process into the background.
// If someone stops the runtime, we'll leak the child process.
// We can ignore that case because we only stop the runtime on pageserver exit.
@@ -1084,11 +1086,7 @@ impl Drop for NoLeakChild {
tokio::task::spawn_blocking(move || {
// Intentionally don't inherit the tracing context from whoever is dropping us.
// This thread here is going to outlive of our dropper.
let span = tracing::info_span!(
"walredo",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()
);
let span = tracing::info_span!("walredo", %tenant_id);
let _entered = span.enter();
Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
})
@@ -1098,11 +1096,11 @@ impl Drop for NoLeakChild {
}
trait NoLeakChildCommandExt {
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild>;
}
impl NoLeakChildCommandExt for Command {
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(tenant_id, self)
}
}
@@ -1157,7 +1155,6 @@ mod tests {
use crate::repository::Key;
use crate::{config::PageServerConf, walrecord::NeonWalRecord};
use bytes::Bytes;
use pageserver_api::shard::TenantShardId;
use std::str::FromStr;
use utils::{id::TenantId, lsn::Lsn};
@@ -1267,9 +1264,9 @@ mod tests {
let repo_dir = camino_tempfile::tempdir()?;
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let tenant_id = TenantId::generate();
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
let manager = PostgresRedoManager::new(conf, tenant_id);
Ok(RedoHarness {
_repo_dir: repo_dir,

View File

@@ -35,8 +35,7 @@
#define PageStoreTrace DEBUG5
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
#define RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
PGconn *pageserver_conn = NULL;
@@ -134,11 +133,6 @@ pageserver_connect(int elevel)
const char *values[3];
int n;
static TimestampTz last_connect_time = 0;
static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC;
TimestampTz now;
uint64_t us_since_last_connect;
Assert(!connected);
if (CheckConnstringUpdated())
@@ -146,22 +140,6 @@ pageserver_connect(int elevel)
ReloadConnstring();
}
now = GetCurrentTimestamp();
us_since_last_connect = now - last_connect_time;
if (us_since_last_connect < delay_us)
{
pg_usleep(delay_us - us_since_last_connect);
delay_us *= 2;
if (delay_us > MAX_RECONNECT_INTERVAL_USEC)
delay_us = MAX_RECONNECT_INTERVAL_USEC;
last_connect_time = GetCurrentTimestamp();
}
else
{
delay_us = MIN_RECONNECT_INTERVAL_USEC;
last_connect_time = now;
}
/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
@@ -355,6 +333,7 @@ pageserver_send(NeonRequest *request)
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
}
n_reconnect_attempts = 0;
}

View File

@@ -99,7 +99,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
port = strchr(host, ':');
if (port == NULL)
{
wp_log(FATAL, "port is not specified");
walprop_log(FATAL, "port is not specified");
}
*port++ = '\0';
sep = strchr(port, ',');
@@ -107,7 +107,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
*sep++ = '\0';
if (wp->n_safekeepers + 1 >= MAX_SAFEKEEPERS)
{
wp_log(FATAL, "too many safekeepers");
walprop_log(FATAL, "Too many safekeepers");
}
wp->safekeeper[wp->n_safekeepers].host = host;
wp->safekeeper[wp->n_safekeepers].port = port;
@@ -123,7 +123,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant);
if (written > MAXCONNINFO || written < 0)
wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
walprop_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
}
initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf);
@@ -133,7 +133,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
if (wp->n_safekeepers < 1)
{
wp_log(FATAL, "safekeepers addresses are not specified");
walprop_log(FATAL, "Safekeepers addresses are not specified");
}
wp->quorum = wp->n_safekeepers / 2 + 1;
@@ -144,15 +144,15 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId));
wp->greetRequest.systemId = wp->config->systemId;
if (!wp->config->neon_timeline)
wp_log(FATAL, "neon.timeline_id is not provided");
walprop_log(FATAL, "neon.timeline_id is not provided");
if (*wp->config->neon_timeline != '\0' &&
!HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16))
wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline);
walprop_log(FATAL, "Could not parse neon.timeline_id, %s", wp->config->neon_timeline);
if (!wp->config->neon_tenant)
wp_log(FATAL, "neon.tenant_id is not provided");
walprop_log(FATAL, "neon.tenant_id is not provided");
if (*wp->config->neon_tenant != '\0' &&
!HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16))
wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant);
walprop_log(FATAL, "Could not parse neon.tenant_id, %s", wp->config->neon_tenant);
wp->greetRequest.timeline = wp->config->pgTimeline;
wp->greetRequest.walSegSize = wp->config->wal_segment_size;
@@ -274,8 +274,8 @@ WalProposerPoll(WalProposer *wp)
if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now,
wp->config->safekeeper_connection_timeout))
{
wp_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that",
sk->host, sk->port, FormatSafekeeperState(sk), wp->config->safekeeper_connection_timeout);
walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that",
sk->host, sk->port, FormatSafekeeperState(sk), wp->config->safekeeper_connection_timeout);
ShutdownConnection(sk);
}
}
@@ -356,8 +356,8 @@ ResetConnection(Safekeeper *sk)
*
* https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS
*/
wp_log(WARNING, "immediate failure to connect with node '%s:%s':\n\terror: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
walprop_log(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
/*
* Even though the connection failed, we still need to clean up the
@@ -380,7 +380,7 @@ ResetConnection(Safekeeper *sk)
* (see libpqrcv_connect, defined in
* src/backend/replication/libpqwalreceiver/libpqwalreceiver.c)
*/
wp_log(LOG, "connecting with node %s:%s", sk->host, sk->port);
walprop_log(LOG, "connecting with node %s:%s", sk->host, sk->port);
sk->state = SS_CONNECTING_WRITE;
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
@@ -434,7 +434,7 @@ ReconnectSafekeepers(WalProposer *wp)
static void
AdvancePollState(Safekeeper *sk, uint32 events)
{
#ifdef WALPROPOSER_LIB /* wp_log needs wp in lib build */
#ifdef WALPROPOSER_LIB /* walprop_log needs wp in lib build */
WalProposer *wp = sk->wp;
#endif
@@ -452,8 +452,8 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* ResetConnection
*/
case SS_OFFLINE:
wp_log(FATAL, "unexpected safekeeper %s:%s state advancement: is offline",
sk->host, sk->port);
walprop_log(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline",
sk->host, sk->port);
break; /* actually unreachable, but prevents
* -Wimplicit-fallthrough */
@@ -488,8 +488,8 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* requests.
*/
case SS_VOTING:
wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk));
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return;
@@ -517,8 +517,8 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* Idle state for waiting votes from quorum.
*/
case SS_IDLE:
wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk));
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return;
@@ -543,8 +543,8 @@ HandleConnectionEvent(Safekeeper *sk)
switch (result)
{
case WP_CONN_POLLING_OK:
wp_log(LOG, "connected with node %s:%s", sk->host,
sk->port);
walprop_log(LOG, "connected with node %s:%s", sk->host,
sk->port);
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
/*
@@ -567,8 +567,8 @@ HandleConnectionEvent(Safekeeper *sk)
break;
case WP_CONN_POLLING_FAILED:
wp_log(WARNING, "failed to connect to node '%s:%s': %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
walprop_log(WARNING, "failed to connect to node '%s:%s': %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
/*
* If connecting failed, we don't want to restart the connection
@@ -604,8 +604,8 @@ SendStartWALPush(Safekeeper *sk)
if (!wp->api.conn_send_query(sk, "START_WAL_PUSH"))
{
wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
walprop_log(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return;
}
@@ -641,8 +641,8 @@ RecvStartWALPushResult(Safekeeper *sk)
break;
case WP_EXEC_FAILED:
wp_log(WARNING, "failed to send query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
walprop_log(WARNING, "Failed to send query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return;
@@ -652,8 +652,8 @@ RecvStartWALPushResult(Safekeeper *sk)
* wrong"
*/
case WP_EXEC_UNEXPECTED_SUCCESS:
wp_log(WARNING, "received bad response from safekeeper %s:%s query execution",
sk->host, sk->port);
walprop_log(WARNING, "Received bad response from safekeeper %s:%s query execution",
sk->host, sk->port);
ShutdownConnection(sk);
return;
}
@@ -688,7 +688,7 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse))
return;
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port);
walprop_log(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port);
/* Protocol is all good, move to voting. */
sk->state = SS_VOTING;
@@ -708,7 +708,7 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (wp->n_connected == wp->quorum)
{
wp->propTerm++;
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
walprop_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp->voteRequest = (VoteRequest)
{
@@ -721,9 +721,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
else if (sk->greetResponse.term > wp->propTerm)
{
/* Another compute with higher term is running. */
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->greetResponse.term, wp->propTerm);
walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->greetResponse.term, wp->propTerm);
}
/*
@@ -763,7 +763,7 @@ SendVoteRequest(Safekeeper *sk)
WalProposer *wp = sk->wp;
/* We have quorum for voting, send our vote request */
wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
walprop_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT))
return;
@@ -780,12 +780,12 @@ RecvVoteResponse(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse))
return;
wp_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
walprop_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
/*
* In case of acceptor rejecting our vote, bail out, but only if either it
@@ -795,9 +795,9 @@ RecvVoteResponse(Safekeeper *sk)
if ((!sk->voteResponse.voteGiven) &&
(sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum))
{
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->voteResponse.term, wp->propTerm);
walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->voteResponse.term, wp->propTerm);
}
Assert(sk->voteResponse.term == wp->propTerm);
@@ -841,7 +841,7 @@ HandleElectedProposer(WalProposer *wp)
*/
if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor]))
{
wp_log(FATAL, "failed to download WAL for logical replicaiton");
walprop_log(FATAL, "failed to download WAL for logical replicaiton");
}
if (wp->truncateLsn == wp->propEpochStartLsn && wp->config->syncSafekeepers)
@@ -948,10 +948,10 @@ DetermineEpochStartLsn(WalProposer *wp)
if (wp->timelineStartLsn != InvalidXLogRecPtr &&
wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn)
{
wp_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(wp->timelineStartLsn),
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
walprop_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(wp->timelineStartLsn),
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
}
wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn;
}
@@ -969,7 +969,7 @@ DetermineEpochStartLsn(WalProposer *wp)
{
wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp);
}
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
walprop_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
}
/*
@@ -996,12 +996,12 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm;
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn;
wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->quorum,
wp->propTerm,
LSN_FORMAT_ARGS(wp->propEpochStartLsn),
wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port,
LSN_FORMAT_ARGS(wp->truncateLsn));
walprop_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->quorum,
wp->propTerm,
LSN_FORMAT_ARGS(wp->propEpochStartLsn),
wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port,
LSN_FORMAT_ARGS(wp->truncateLsn));
/*
* Ensure the basebackup we are running (at RedoStartLsn) matches LSN
@@ -1034,10 +1034,10 @@ DetermineEpochStartLsn(WalProposer *wp)
* scenario.
*/
disable_core_dump();
wp_log(PANIC,
"collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X",
LSN_FORMAT_ARGS(wp->propEpochStartLsn),
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));
walprop_log(PANIC,
"collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X",
LSN_FORMAT_ARGS(wp->propEpochStartLsn),
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));
}
}
walprop_shared->mineLastElectedTerm = wp->propTerm;
@@ -1091,10 +1091,34 @@ SendProposerElected(Safekeeper *sk)
{
/* safekeeper is empty or no common point, start from the beginning */
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u" ,
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
/* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline is created manually (test_s3_wal_replay) */
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
if (sk->startStreamingAt < wp->truncateLsn)
{
/*
* There's a gap between the WAL starting point and a truncateLsn,
* which can't appear in a normal working cluster. That gap means
* that all safekeepers reported that they have persisted WAL up
* to the truncateLsn before, but now current safekeeper tells
* otherwise.
*
* Also we have a special condition here, which is empty
* safekeeper with no history. In combination with a gap, that can
* happen when we introduce a new safekeeper to the cluster. This
* is a rare case, which is triggered manually for now, and should
* be treated with care.
*/
/*
* truncateLsn will not change without ack from current
* safekeeper, and it's aligned to the WAL record, so we can
* safely start streaming from this point.
*/
sk->startStreamingAt = wp->truncateLsn;
walprop_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X",
sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn),
LSN_FORMAT_ARGS(sk->startStreamingAt));
}
}
else
{
@@ -1117,7 +1141,7 @@ SendProposerElected(Safekeeper *sk)
}
}
Assert(sk->startStreamingAt <= wp->availableLsn);
Assert(sk->startStreamingAt >= wp->truncateLsn && sk->startStreamingAt <= wp->availableLsn);
msg.tag = 'e';
msg.term = wp->propTerm;
@@ -1126,9 +1150,9 @@ SendProposerElected(Safekeeper *sk)
msg.timelineStartLsn = wp->timelineStartLsn;
lastCommonTerm = i >= 0 ? wp->propTermHistory.entries[i].term : 0;
wp_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
walprop_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
resetStringInfo(&sk->outbuf);
pq_sendint64_le(&sk->outbuf, msg.tag);
@@ -1237,8 +1261,8 @@ HandleActiveState(Safekeeper *sk, uint32 events)
/* expected never to happen, c.f. walprop_pg_active_state_update_event_set */
if (events & WL_SOCKET_CLOSED)
{
wp_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket",
sk->host, sk->port);
walprop_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket",
sk->host, sk->port);
ShutdownConnection(sk);
return;
}
@@ -1299,12 +1323,12 @@ SendAppendRequests(Safekeeper *sk)
req = &sk->appendRequest;
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
wp_log(DEBUG5, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
walprop_log(DEBUG5, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
resetStringInfo(&sk->outbuf);
@@ -1331,8 +1355,8 @@ SendAppendRequests(Safekeeper *sk)
case NEON_WALREAD_WOULDBLOCK:
return true;
case NEON_WALREAD_ERROR:
wp_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port, errmsg);
walprop_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port, errmsg);
ShutdownConnection(sk);
return false;
default:
@@ -1364,9 +1388,9 @@ SendAppendRequests(Safekeeper *sk)
return true;
case PG_ASYNC_WRITE_FAIL:
wp_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
walprop_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
default:
@@ -1405,11 +1429,11 @@ RecvAppendResponses(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->appendResponse))
break;
wp_log(DEBUG2, "received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s",
sk->appendResponse.term,
LSN_FORMAT_ARGS(sk->appendResponse.flushLsn),
LSN_FORMAT_ARGS(sk->appendResponse.commitLsn),
sk->host, sk->port);
walprop_log(DEBUG2, "received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s",
sk->appendResponse.term,
LSN_FORMAT_ARGS(sk->appendResponse.flushLsn),
LSN_FORMAT_ARGS(sk->appendResponse.commitLsn),
sk->host, sk->port);
if (sk->appendResponse.term > wp->propTerm)
{
@@ -1419,9 +1443,9 @@ RecvAppendResponses(Safekeeper *sk)
* core as this is kinda expected scenario.
*/
disable_core_dump();
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
walprop_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
}
readAnything = true;
@@ -1465,32 +1489,32 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->currentClusterSize = pq_getmsgint64(reply_message);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu",
rf->currentClusterSize);
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu",
rf->currentClusterSize);
}
else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->last_received_lsn = pq_getmsgint64(reply_message);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X",
LSN_FORMAT_ARGS(rf->last_received_lsn));
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X",
LSN_FORMAT_ARGS(rf->last_received_lsn));
}
else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->disk_consistent_lsn = pq_getmsgint64(reply_message);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->disk_consistent_lsn));
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->disk_consistent_lsn));
}
else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->remote_consistent_lsn = pq_getmsgint64(reply_message);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->remote_consistent_lsn));
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->remote_consistent_lsn));
}
else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0))
{
@@ -1502,8 +1526,8 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime));
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s",
rf->replytime, replyTimeStr);
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s",
rf->replytime, replyTimeStr);
pfree(replyTimeStr);
}
@@ -1517,7 +1541,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
* Skip unknown keys to support backward compatibile protocol
* changes
*/
wp_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len);
walprop_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len);
pq_getmsgbytes(reply_message, len);
};
}
@@ -1582,7 +1606,7 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
if (wp->n_votes < wp->quorum)
{
wp_log(WARNING, "GetDonor called before elections are won");
walprop_log(WARNING, "GetDonor called before elections are won");
return NULL;
}
@@ -1710,9 +1734,9 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
return false;
case PG_ASYNC_READ_FAIL:
wp_log(WARNING, "failed to read from node %s:%s in %s state: %s", sk->host,
sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host,
sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
}
@@ -1750,8 +1774,8 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
@@ -1827,9 +1851,9 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
if (!wp->api.conn_blocking_write(sk, msg, msg_size))
{
wp_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
}
@@ -1880,9 +1904,9 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta
wp->api.update_event_set(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
return false;
case PG_ASYNC_WRITE_FAIL:
wp_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
default:
@@ -1919,9 +1943,9 @@ AsyncFlush(Safekeeper *sk)
/* Nothing to do; try again when the socket's ready */
return false;
case -1:
wp_log(WARNING, "failed to flush write to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ResetConnection(sk);
return false;
default:
@@ -1950,11 +1974,11 @@ CompareLsn(const void *a, const void *b)
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* wp_log(LOG, "currently in %s state", FormatSafekeeperState(sk));
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* wp_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk));
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk));
*/
static char *
FormatSafekeeperState(Safekeeper *sk)
@@ -2035,8 +2059,8 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk)
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
wp_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk));
walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk));
Assert(events_ok_for_state);
}
}
@@ -2175,8 +2199,8 @@ FormatEvents(WalProposer *wp, uint32 events)
if (events & (~all_flags))
{
wp_log(WARNING, "event formatting found unexpected component %d",
events & (~all_flags));
walprop_log(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}

View File

@@ -707,23 +707,11 @@ extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn);
#define WPEVENT 1337 /* special log level for walproposer internal
* events */
#define WP_LOG_PREFIX "[WP] "
/*
* wp_log is used in pure wp code (walproposer.c), allowing API callback to
* catch logging.
*/
#ifdef WALPROPOSER_LIB
extern void WalProposerLibLog(WalProposer *wp, int elevel, char *fmt,...);
#define wp_log(elevel, fmt, ...) WalProposerLibLog(wp, elevel, fmt, ## __VA_ARGS__)
#define walprop_log(elevel, ...) WalProposerLibLog(wp, elevel, __VA_ARGS__)
#else
#define wp_log(elevel, fmt, ...) elog(elevel, WP_LOG_PREFIX fmt, ## __VA_ARGS__)
#define walprop_log(elevel, ...) elog(elevel, __VA_ARGS__)
#endif
/*
* And wpg_log is used all other (postgres specific) walproposer code, just
* adding prefix.
*/
#define wpg_log(elevel, fmt, ...) elog(elevel, WP_LOG_PREFIX fmt, ## __VA_ARGS__)
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -424,8 +424,8 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos)
{
StartReplicationCmd cmd;
wpg_log(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
elog(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
cmd.timeline = wp->greetRequest.timeline;
cmd.startpoint = startpos;
@@ -549,7 +549,7 @@ walprop_pg_load_libpqwalreceiver(void)
{
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
wpg_log(ERROR, "libpqwalreceiver didn't initialize correctly");
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
}
/* Helper function */
@@ -630,7 +630,7 @@ libpqwp_connect_start(char *conninfo)
* PGconn structure"
*/
if (!pg_conn)
wpg_log(FATAL, "failed to allocate new PGconn object");
elog(FATAL, "failed to allocate new PGconn object");
/*
* And in theory this allocation can fail as well, but it's incredibly
@@ -680,7 +680,7 @@ walprop_connect_poll(Safekeeper *sk)
* unused. We'll expect it's never returned.
*/
case PGRES_POLLING_ACTIVE:
wpg_log(FATAL, "unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/*
* This return is never actually reached, but it's here to make
@@ -745,7 +745,7 @@ libpqwp_get_query_result(WalProposerConn *conn)
*/
if (!result)
{
wpg_log(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
return WP_EXEC_UNEXPECTED_SUCCESS;
}
@@ -793,7 +793,7 @@ libpqwp_get_query_result(WalProposerConn *conn)
}
if (unexpected_success)
wpg_log(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
return return_val;
}
@@ -872,7 +872,7 @@ libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount)
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
wpg_log(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/*
* If there was actually an error, it'll be properly reported
@@ -937,7 +937,7 @@ walprop_async_write(Safekeeper *sk, void const *buf, size_t size)
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
wpg_log(FATAL, "invalid return %d from PQputCopyData", result);
elog(FATAL, "invalid return %d from PQputCopyData", result);
}
/*
@@ -958,7 +958,7 @@ walprop_async_write(Safekeeper *sk, void const *buf, size_t size)
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
wpg_log(FATAL, "invalid return %d from PQflush", result);
elog(FATAL, "invalid return %d from PQflush", result);
}
}
@@ -1237,6 +1237,19 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
return true; /* recovery not needed */
endpos = wp->propEpochStartLsn;
/*
* If we need to download more than a max_slot_wal_keep_size, cap to it to
* avoid risk of exploding pg_wal. Logical replication won't work until
* recreated, but at least compute would start; this also follows
* max_slot_wal_keep_size semantics.
*/
download_range_mb = (endpos - startpos) / 1024 / 1024;
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
{
startpos = endpos - max_slot_wal_keep_size_mb * 1024 * 1024;
walprop_log(WARNING, "capped WAL download for logical replication to %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(startpos), max_slot_wal_keep_size_mb);
}
timeline = wp->greetRequest.timeline;
if (!neon_auth_token)
@@ -1249,7 +1262,7 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo);
if (written > MAXCONNINFO || written < 0)
wpg_log(FATAL, "could not append password to the safekeeper connection string");
elog(FATAL, "could not append password to the safekeeper connection string");
}
#if PG_MAJORVERSION_NUM < 16
@@ -1266,11 +1279,11 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
err)));
return false;
}
wpg_log(LOG,
"start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
sk->host, sk->port, (uint32) (startpos >> 32),
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
elog(LOG,
"start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
sk->host, sk->port, (uint32) (startpos >> 32),
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
options.logical = false;
options.startpoint = startpos;
@@ -1468,11 +1481,11 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
{
char log_prefix[64];
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port);
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);
if (sk->xlogreader == NULL)
wpg_log(FATAL, "failed to allocate xlog reader");
elog(FATAL, "Failed to allocate xlog reader");
}
static NeonWALReadResult
@@ -1536,7 +1549,7 @@ static void
walprop_pg_init_event_set(WalProposer *wp)
{
if (waitEvents)
wpg_log(FATAL, "double-initialization of event set");
elog(FATAL, "double-initialization of event set");
/* for each sk, we have socket plus potentially socket for neon walreader */
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + 2 * wp->n_safekeepers);
@@ -1568,7 +1581,7 @@ add_nwr_event_set(Safekeeper *sk, uint32 events)
Assert(sk->nwrEventPos == -1);
sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk);
sk->nwrConnEstablished = NeonWALReaderIsRemConnEstablished(sk->xlogreader);
wpg_log(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events);
elog(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events);
}
static void
@@ -1667,8 +1680,8 @@ rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk)
{
WalProposer *wp = to_remove->wp;
wpg_log(DEBUG5, "sk %s:%s: removing event, is_sk %d",
to_remove->host, to_remove->port, is_sk);
elog(DEBUG5, "sk %s:%s: removing event, is_sk %d",
to_remove->host, to_remove->port, is_sk);
/*
* Shortpath for exiting if have nothing to do. We never call this
@@ -1822,13 +1835,13 @@ GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp)
rf->remote_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn;
rf->replytime = wp->safekeeper[latest_safekeeper].appendResponse.rf.replytime;
wpg_log(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
" last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu",
rf->currentClusterSize,
LSN_FORMAT_ARGS(rf->last_received_lsn),
LSN_FORMAT_ARGS(rf->disk_consistent_lsn),
LSN_FORMAT_ARGS(rf->remote_consistent_lsn),
rf->replytime);
elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
" last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu",
rf->currentClusterSize,
LSN_FORMAT_ARGS(rf->last_received_lsn),
LSN_FORMAT_ARGS(rf->disk_consistent_lsn),
LSN_FORMAT_ARGS(rf->remote_consistent_lsn),
rf->replytime);
}
/*
@@ -1974,7 +1987,7 @@ GetLogRepRestartLSN(WalProposer *wp)
{
uint64 download_range_mb;
wpg_log(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
elog(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
/*
* If we need to download more than a max_slot_wal_keep_size,
@@ -1986,8 +1999,8 @@ GetLogRepRestartLSN(WalProposer *wp)
download_range_mb = (wp->propEpochStartLsn - lrRestartLsn) / MB;
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
{
wpg_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb);
walprop_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb);
return InvalidXLogRecPtr;
}

49
poetry.lock generated
View File

@@ -339,19 +339,19 @@ uvloop = ["uvloop (>=0.15.2)"]
[[package]]
name = "boto3"
version = "1.34.11"
version = "1.26.16"
description = "The AWS SDK for Python"
optional = false
python-versions = ">= 3.8"
python-versions = ">= 3.7"
files = [
{file = "boto3-1.34.11-py3-none-any.whl", hash = "sha256:1af021e0c6e3040e8de66d403e963566476235bb70f9a8e3f6784813ac2d8026"},
{file = "boto3-1.34.11.tar.gz", hash = "sha256:31c130a40ec0631059b77d7e87f67ad03ff1685a5b37638ac0c4687026a3259d"},
{file = "boto3-1.26.16-py3-none-any.whl", hash = "sha256:4f493a2aed71cee93e626de4f67ce58dd82c0473480a0fc45b131715cd8f4f30"},
{file = "boto3-1.26.16.tar.gz", hash = "sha256:31c0adf71e4bd19a5428580bb229d7ea3b5795eecaa0847a85385df00c026116"},
]
[package.dependencies]
botocore = ">=1.34.11,<1.35.0"
botocore = ">=1.29.16,<1.30.0"
jmespath = ">=0.7.1,<2.0.0"
s3transfer = ">=0.10.0,<0.11.0"
s3transfer = ">=0.6.0,<0.7.0"
[package.extras]
crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
@@ -702,25 +702,22 @@ xray = ["mypy-boto3-xray (>=1.26.0,<1.27.0)"]
[[package]]
name = "botocore"
version = "1.34.11"
version = "1.29.16"
description = "Low-level, data-driven core of boto 3."
optional = false
python-versions = ">= 3.8"
python-versions = ">= 3.7"
files = [
{file = "botocore-1.34.11-py3-none-any.whl", hash = "sha256:1ff1398b6ea670e1c01ac67a33af3da854f8e700d3528289c04f319c330d8250"},
{file = "botocore-1.34.11.tar.gz", hash = "sha256:51905c3d623c60df5dc5794387de7caf886d350180a01a3dfa762e903edb45a9"},
{file = "botocore-1.29.16-py3-none-any.whl", hash = "sha256:271b599e6cfe214405ed50d41cd967add1d5d469383dd81ff583bc818b47f59b"},
{file = "botocore-1.29.16.tar.gz", hash = "sha256:8cfcc10f2f1751608c3cec694f2d6b5e16ebcd50d0a104f9914d5616227c62e9"},
]
[package.dependencies]
jmespath = ">=0.7.1,<2.0.0"
python-dateutil = ">=2.1,<3.0.0"
urllib3 = [
{version = ">=1.25.4,<1.27", markers = "python_version < \"3.10\""},
{version = ">=1.25.4,<2.1", markers = "python_version >= \"3.10\""},
]
urllib3 = ">=1.25.4,<1.27"
[package.extras]
crt = ["awscrt (==0.19.19)"]
crt = ["awscrt (==0.14.0)"]
[[package]]
name = "botocore-stubs"
@@ -1892,13 +1889,13 @@ files = [
[[package]]
name = "pytest"
version = "7.4.4"
version = "7.3.1"
description = "pytest: simple powerful testing with Python"
optional = false
python-versions = ">=3.7"
files = [
{file = "pytest-7.4.4-py3-none-any.whl", hash = "sha256:b090cdf5ed60bf4c45261be03239c2c1c22df034fbffe691abe93cd80cea01d8"},
{file = "pytest-7.4.4.tar.gz", hash = "sha256:2cf0005922c6ace4a3e2ec8b4080eb0d9753fdc93107415332f50ce9e7994280"},
{file = "pytest-7.3.1-py3-none-any.whl", hash = "sha256:3799fa815351fea3a5e96ac7e503a96fa51cc9942c3753cda7651b93c1cfa362"},
{file = "pytest-7.3.1.tar.gz", hash = "sha256:434afafd78b1d78ed0addf160ad2b77a30d35d4bdf8af234fe621919d9ed15e3"},
]
[package.dependencies]
@@ -1910,7 +1907,7 @@ pluggy = ">=0.12,<2.0"
tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""}
[package.extras]
testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "xmlschema"]
[[package]]
name = "pytest-asyncio"
@@ -2233,20 +2230,20 @@ files = [
[[package]]
name = "s3transfer"
version = "0.10.0"
version = "0.6.0"
description = "An Amazon S3 Transfer Manager"
optional = false
python-versions = ">= 3.8"
python-versions = ">= 3.7"
files = [
{file = "s3transfer-0.10.0-py3-none-any.whl", hash = "sha256:3cdb40f5cfa6966e812209d0994f2a4709b561c88e90cf00c2696d2df4e56b2e"},
{file = "s3transfer-0.10.0.tar.gz", hash = "sha256:d0c8bbf672d5eebbe4e57945e23b972d963f07d82f661cabf678a5c88831595b"},
{file = "s3transfer-0.6.0-py3-none-any.whl", hash = "sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd"},
{file = "s3transfer-0.6.0.tar.gz", hash = "sha256:2ed07d3866f523cc561bf4a00fc5535827981b117dd7876f036b0c1aca42c947"},
]
[package.dependencies]
botocore = ">=1.33.2,<2.0a.0"
botocore = ">=1.12.36,<2.0a.0"
[package.extras]
crt = ["botocore[crt] (>=1.33.2,<2.0a.0)"]
crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
[[package]]
name = "sarif-om"
@@ -2743,4 +2740,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "8de8b05a9b35a6f76da7d7e3652ddbb521f1eca53fce7b933f537080a9d6eada"
content-hash = "c4e38082d246636903e15c02fbf8364c6afc1fd35d36a81c49f596ba68fc739b"

View File

@@ -6,7 +6,7 @@ authors = []
[tool.poetry.dependencies]
python = "^3.9"
pytest = "^7.4.4"
pytest = "^7.3.1"
psycopg2-binary = "^2.9.6"
typing-extensions = "^4.6.1"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
@@ -17,7 +17,7 @@ aiopg = "^1.4.0"
Jinja2 = "^3.0.2"
types-requests = "^2.31.0.0"
types-psycopg2 = "^2.9.21.10"
boto3 = "^1.34.11"
boto3 = "^1.26.16"
boto3-stubs = {extras = ["s3"], version = "^1.26.16"}
moto = {extras = ["server"], version = "^4.1.2"}
backoff = "^2.2.1"

View File

@@ -6,7 +6,6 @@ license.workspace = true
[dependencies]
aws-sdk-s3.workspace = true
aws-smithy-async.workspace = true
either.workspace = true
tokio-rustls.workspace = true
anyhow.workspace = true
@@ -40,5 +39,3 @@ tracing-subscriber.workspace = true
clap.workspace = true
tracing-appender = "0.2"
histogram = "0.7"
futures.workspace = true

View File

@@ -16,12 +16,10 @@ use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::retry::RetryConfig;
use aws_config::sso::SsoCredentialsProvider;
use aws_config::BehaviorVersion;
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::{Client, Config};
use aws_smithy_async::rt::sleep::TokioSleep;
use clap::ValueEnum;
use pageserver::tenant::TENANTS_SEGMENT_NAME;
@@ -285,13 +283,9 @@ pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Clie
)
};
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
let mut builder = Config::builder()
.behavior_version(BehaviorVersion::v2023_11_09())
.region(bucket_region)
.retry_config(RetryConfig::adaptive().with_max_attempts(3))
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
.credentials_provider(credentials_provider);
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {

View File

@@ -1,4 +1,3 @@
use pageserver_api::shard::TenantShardId;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::scan_metadata::scan_metadata;
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
@@ -35,8 +34,6 @@ enum Command {
ScanMetadata {
#[arg(short, long, default_value_t = false)]
json: bool,
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
},
}
@@ -60,37 +57,35 @@ async fn main() -> anyhow::Result<()> {
));
match cli.command {
Command::ScanMetadata { json, tenant_ids } => {
match scan_metadata(bucket_config.clone(), tenant_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
Command::ScanMetadata { json } => match scan_metadata(bucket_config.clone()).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
Err(anyhow::anyhow!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
))
} else {
Ok(())
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
Err(anyhow::anyhow!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
))
} else {
Ok(())
}
}
}
},
Command::FindGarbage {
node_kind,
depth,

View File

@@ -187,17 +187,10 @@ Timeline layer count: {6}
}
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
pub async fn scan_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
let tenants = stream_tenants(&s3_client, &target);
// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.

View File

@@ -4,12 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
[dependencies]
async-stream.workspace = true
anyhow.workspace = true
@@ -22,7 +16,6 @@ chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
crc32c.workspace = true
fail.workspace = true
fs2.workspace = true
git-version.workspace = true
hex.workspace = true

View File

@@ -54,19 +54,6 @@ const ID_FILE_NAME: &str = "safekeeper.id";
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
const FEATURES: &[&str] = &[
#[cfg(feature = "testing")]
"testing",
];
fn version() -> String {
format!(
"{GIT_VERSION} failpoints: {}, features: {:?}",
fail::has_failpoints(),
FEATURES,
)
}
const ABOUT: &str = r#"
A fleet of safekeepers is responsible for reliably storing WAL received from
compute, passing it through consensus (mitigating potential computes brain
@@ -180,9 +167,7 @@ async fn main() -> anyhow::Result<()> {
// getting 'argument cannot be used multiple times' error. This seems to be
// impossible with pure Derive API, so convert struct to Command, modify it,
// parse arguments, and then fill the struct back.
let cmd = <Args as clap::CommandFactory>::command()
.args_override_self(true)
.version(version());
let cmd = <Args as clap::CommandFactory>::command().args_override_self(true);
let mut matches = cmd.get_matches();
let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;

View File

@@ -12,8 +12,6 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::failpoints_handler;
use std::io::Write as _;
use tokio::sync::mpsc;
@@ -446,12 +444,6 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.data(Arc::new(conf))
.data(auth)
.get("/v1/status", |r| request_span(r, status_handler))
.put("/v1/failpoints", |r| {
request_span(r, move |r| async {
let cancel = CancellationToken::new();
failpoints_handler(r, cancel).await
})
})
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {
request_span(r, timeline_create_handler)

View File

@@ -17,7 +17,6 @@ use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::lsn::AtomicLsn;
use utils::pageserver_feedback::PageserverFeedback;
@@ -392,8 +391,15 @@ impl SafekeeperPostgresHandler {
// application_name: give only committed WAL (used by pageserver) or all
// existing WAL (up to flush_lsn, used by walproposer or peer recovery).
// The second case is always driven by a consensus leader which term
// must be supplied.
let end_watch = if term.is_some() {
// must generally be also supplied. However we're sloppy to do this in
// walproposer recovery which will be removed soon. So TODO is to make
// it not Option'al then.
//
// Fetching WAL without term in recovery creates a small risk of this
// WAL getting concurrently garbaged if another compute rises which
// collects majority and starts fixing log on this safekeeper itself.
// That's ok as (old) proposer will never be able to commit such WAL.
let end_watch = if self.is_walproposer_recovery() {
EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
} else {
EndWatch::Commit(tli.get_commit_lsn_watch_rx())
@@ -529,19 +535,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
);
// try to send as much as available, capped by MAX_SEND_SIZE
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
// if we went behind available WAL, back off
if chunk_end_pos >= self.end_pos {
chunk_end_pos = self.end_pos;
} else {
// If sending not up to end pos, round down to page boundary to
// avoid breaking WAL record not at page boundary, as protocol
// demands. See walsender.c (XLogSendPhysical).
chunk_end_pos = chunk_end_pos
.checked_sub(chunk_end_pos.block_offset())
.unwrap();
}
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
let mut send_size = self
.end_pos
.checked_sub(self.start_pos)
.context("reading wal without waiting for it first")?
.0 as usize;
send_size = min(send_size, self.send_buf.len());
let send_buf = &mut self.send_buf[..send_size];
let send_size: usize;
{
@@ -552,8 +551,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
} else {
None
};
// Read WAL into buffer. send_size can be additionally capped to
// segment boundary here.
// read wal into buffer
send_size = self.wal_reader.read(send_buf).await?
};
let send_buf = &send_buf[..send_size];
@@ -568,11 +566,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
}))
.await?;
if let Some(appname) = &self.appname {
if appname == "replica" {
failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
}
}
trace!(
"sent {} bytes of WAL {}-{}",
send_size,

View File

@@ -565,9 +565,6 @@ impl WalReader {
})
}
/// Read WAL at current position into provided buf, returns number of bytes
/// read. It can be smaller than buf size only if segment boundary is
/// reached.
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// If this timeline is new, we may not have a full segment yet, so
// we pad the first bytes of the timeline's first WAL segment with 0s

View File

@@ -347,9 +347,7 @@ class PgProtocol:
"""
return self.safe_psql_many([query], **kwargs)[0]
def safe_psql_many(
self, queries: List[str], log_query=True, **kwargs: Any
) -> List[List[Tuple[Any, ...]]]:
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
"""
Execute queries against the node and return all rows.
This method passes all extra params to connstr.
@@ -358,8 +356,7 @@ class PgProtocol:
with closing(self.connect(**kwargs)) as conn:
with conn.cursor() as cur:
for query in queries:
if log_query:
log.info(f"Executing query: {query}")
log.info(f"Executing query: {query}")
cur.execute(query)
if cur.description is None:
@@ -368,11 +365,11 @@ class PgProtocol:
result.append(cur.fetchall())
return result
def safe_psql_scalar(self, query, log_query=True) -> Any:
def safe_psql_scalar(self, query) -> Any:
"""
Execute query returning single row with single column.
"""
return self.safe_psql(query, log_query=log_query)[0][0]
return self.safe_psql(query)[0][0]
@dataclass
@@ -893,8 +890,8 @@ class NeonEnv:
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
def get_binary_version(self, binary_name: str) -> str:
bin_pageserver = str(self.neon_binpath / binary_name)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
res = subprocess.run(
[bin_pageserver, "--version"],
check=True,
@@ -1659,7 +1656,7 @@ class NeonPageserver(PgProtocol):
self.running = False
self.service_port = port
self.config_override = config_override
self.version = env.get_binary_version("pageserver")
self.version = env.get_pageserver_version()
# After a test finishes, we will scrape the log to see if there are any
# unexpected error messages. If your test expects an error, add it to
@@ -2927,10 +2924,7 @@ class Safekeeper:
return res
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper")
return SafekeeperHttpClient(
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
)
return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token)
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
@@ -2981,11 +2975,10 @@ class SafekeeperMetrics:
class SafekeeperHttpClient(requests.Session):
HTTPError = requests.HTTPError
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
def __init__(self, port: int, auth_token: Optional[str] = None):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled = is_testing_enabled
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
@@ -2993,30 +2986,6 @@ class SafekeeperHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def is_testing_enabled_or_skip(self):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
res_json = res.json()
assert res_json is None
return res_json
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)

View File

@@ -714,7 +714,7 @@ class PageserverHttpClient(requests.Session):
)
self.verbose_error(res)
assert res.status_code in (200, 304)
assert res.status_code == 200
def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)

View File

@@ -1,59 +1,19 @@
import os
import re
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
primary_lsn = primary.safe_psql_scalar(
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
)
while True:
secondary_lsn = secondary.safe_psql_scalar(
"SELECT pg_last_wal_replay_lsn()", log_query=False
)
caught_up = secondary_lsn >= primary_lsn
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
if caught_up:
return
time.sleep(1)
# Check for corrupted WAL messages which might otherwise go unnoticed if
# reconnection fixes this.
def scan_standby_log_for_errors(secondary):
log_path = secondary.endpoint_path() / "compute.log"
with log_path.open("r") as f:
markers = re.compile(
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
)
for line in f:
if markers.search(line):
log.info(f"bad error in standby log: {line}")
raise AssertionError()
from fixtures.neon_fixtures import NeonEnv
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env
# We've had a bug caused by WAL records split across multiple XLogData
# messages resulting in corrupted WAL complains on standby. It reproduced
# only when sending from safekeeper is slow enough to grab full
# MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in
# one conf to decrease test time.
slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "")
if slow_down_send:
sk_http = env.safekeepers[0].http_client()
sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")])
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
time.sleep(1)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
primary_lsn = None
caught_up = False
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
@@ -66,6 +26,23 @@ def test_hot_standby(neon_simple_env: NeonEnv):
with p_con.cursor() as p_cur:
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
p_con.commit()
with p_con.cursor() as p_cur:
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
res = p_cur.fetchone()
assert res is not None
(lsn,) = res
primary_lsn = lsn
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
# Note that this may generate more WAL if the transaction has changed
# things, but we don't care about that.
p_con.commit()
for query in queries:
with p_con.cursor() as p_cur:
p_cur.execute(query)
@@ -74,28 +51,30 @@ def test_hot_standby(neon_simple_env: NeonEnv):
response = res
responses[query] = response
# insert more data to make safekeeper send MAX_SEND_SIZE messages
if slow_down_send:
primary.safe_psql("create table t(key int, value text)")
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
wait_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
while not caught_up:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
res = secondary_cursor.fetchone()
assert res is not None
(secondary_lsn,) = res
# There may be more changes on the primary after we got our LSN
# due to e.g. autovacuum, but that shouldn't impact the content
# of the tables, so we check whether we've replayed up to at
# least after the commit of the `test` table.
caught_up = secondary_lsn >= primary_lsn
# Explicit commit to flush any transient transaction-level state.
s_con.commit()
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
assert response == responses[query]
scan_standby_log_for_errors(secondary)
# clean up
if slow_down_send:
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))

View File

@@ -38,9 +38,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
ps_http = env.pageserver.http_client()

View File

@@ -566,7 +566,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
)
endpoint.stop()
endpoint.stop_and_destroy()
timeline_delete_wait_completed(ps_http, tenant_id, timeline_id)
# Also delete and manually create timeline on safekeepers -- this tests

View File

@@ -475,46 +475,6 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_unavailability(env, endpoint))
async def run_recovery_uncommitted(env: NeonEnv):
(sk1, sk2, _) = env.safekeepers
env.neon_cli.create_branch("test_recovery_uncommitted")
ep = env.endpoints.create_start("test_recovery_uncommitted")
ep.safe_psql("create table t(key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
# insert with only one safekeeper up to create tail of flushed but not committed WAL
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
# Start one of sks to make quorum online plus compute and ensure they can
# sync.
sk2.start()
ep = env.endpoints.create_start(
"test_recovery_uncommitted",
)
ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'")
# Test pulling uncommitted WAL (up to flush_lsn) during recovery.
def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
asyncio.run(run_recovery_uncommitted(env))
@dataclass
class RaceConditionTest:
iteration: int

View File

@@ -167,21 +167,22 @@ build: |
&& apt-get update \
&& apt-get install -y \
build-essential \
git \
curl \
libevent-dev \
libtool \
libssl-dev \
patchutils \
pkg-config
# Note, we use pgbouncer from neondatabase/pgbouncer fork, which could contain extra commits.
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
ENV PGBOUNCER_TAG pgbouncer_1_21_0-neon-1
ENV PGBOUNCER_VERSION 1.21.0
ENV PGBOUNCER_GITPATH 1_21_0
RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/neondatabase/pgbouncer.git pgbouncer \
&& cd pgbouncer \
&& ./autogen.sh \
&& curl -sfSL https://github.com/pgbouncer/pgbouncer/releases/download/pgbouncer_${PGBOUNCER_GITPATH}/pgbouncer-${PGBOUNCER_VERSION}.tar.gz -o pgbouncer-${PGBOUNCER_VERSION}.tar.gz \
&& tar xzvf pgbouncer-${PGBOUNCER_VERSION}.tar.gz \
&& cd pgbouncer-${PGBOUNCER_VERSION} \
&& curl https://github.com/pgbouncer/pgbouncer/commit/a7b3c0a5f4caa9dbe92743d04cf1e28c4c05806c.patch | filterdiff --include a/src/server.c | patch -p1 \
&& LDFLAGS=-static ./configure --prefix=/usr/local/pgbouncer --without-openssl \
&& make -j $(nproc) dist_man_MANS= \
&& make install dist_man_MANS=
&& make -j $(nproc) \
&& make install
merge: |
# tweak nofile limits
RUN set -e \