Compare commits

..

42 Commits

Author SHA1 Message Date
Konstantin Knizhnik
9fb17dba04 Add neon.max_vacuum_defer_cleanup_age to restrict bloat of master in case of using hot_standby_feedback 2024-07-05 14:21:25 +03:00
Arpad Müller
adde0ecfe0 Flatten compression algorithm setting (#8265)
This flattens the compression algorithm setting, removing the
`Option<_>` wrapping layer and making handling of the setting easier.

It also adds a specific setting for *disabled* compression with the
continued ability to read copmressed data, giving us the option to
more easily back out of a compression rollout, should the need arise,
which was one of the limitations of #8238.

Implements my suggestion from
https://github.com/neondatabase/neon/pull/8238#issuecomment-2206181594 ,
inspired by Christian's review in
https://github.com/neondatabase/neon/pull/8238#pullrequestreview-2156460268 .

Part of #5431
2024-07-04 16:59:19 +00:00
Yuchen Liang
19accfee4e feat(pageserver): integrate lsn lease into synthetic size (#8220)
Part of #7497, closes #8071. (accidentally closed #8208, reopened here)

## Problem

After the changes in #8084, we need synthetic size to also account for
leased LSNs so that users do not get free retention by running a small
ephemeral endpoint for a long time.

## Summary of changes

This PR integrates LSN leases into the synthetic size calculation. We
model leases as read-only branches started at the leased LSN (except it
does not have a timeline id).

Other changes:
- Add new unit tests testing whether a lease behaves like a read-only
branch.
- Change `/size_debug` response to include lease point in the SVG
visualization.
- Fix `/lsn_lease` HTTP API to do proper parsing for POST.



Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-07-04 15:09:05 +00:00
Arpad Müller
e579bc0819 Add find-large-objects subcommand to scrubber (#8257)
Adds a find-large-objects subcommand to the scrubber to allow listing
layer objects larger than a specific size.

To be used like:

```
AWS_PROFILE=dev REGION=us-east-2 BUCKET=neon-dev-storage-us-east-2 cargo run -p storage_scrubber -- find-large-objects --min-size 250000000 --ignore-deltas
```

Part of #5431
2024-07-04 15:07:16 +00:00
John Spray
c9e6dd45d3 pageserver: downgrade stale generation messages to INFO (#8256)
## Problem

When generations were new, these messages were an important way of
noticing if something unexpected was going on. We found some real issues
when investigating tests that unexpectedly tripped them.

At time has gone on, this code is now pretty battle-tested, and as we do
more live migrations etc, it's fairly normal to see the occasional
message from a node with a stale generation.

At this point the cognitive load on developers to selectively allow-list
these logs outweighs the benefit of having them at warn severity.

Closes: https://github.com/neondatabase/neon/issues/8080

## Summary of changes

- Downgrade "Dropped remote consistent LSN updates" and "Dropping stale
deletions" messages to INFO
- Remove all the allow-list entries for these logs.
2024-07-04 15:05:41 +01:00
Alexander Bayandin
bf9fc77061 CI(pg-clients): unify workflow with build-and-test (#8160)
## Problem

`pg-clients` workflow looks different from the main `build-and-test`
workflow for historical reasons (it was my very first task at Neon, and 
back then I wasn't really familiar with the rest of the CI pipelines).
This PR unifies `pg-clients` workflow with `build-and-test`

## Summary of changes
- Rename `pg_clients.yml` to `pg-clients.yml`
- Run the workflow on changes in relevant files
- Create Allure report for tests
- Send slack notifications to `#on-call-qa-staging-stream` channel
(instead of `#on-call-staging-stream`)
- Update Client libraries once we're here
2024-07-04 14:58:01 +01:00
Arpad Müller
a004d27fca Use bool param for round_trip_test_compressed (#8252)
As per @koivunej 's request in
https://github.com/neondatabase/neon/pull/8238#discussion_r1663892091 ,
use a runtime param instead of monomorphizing the function based on the value.

Part of https://github.com/neondatabase/neon/issues/5431
2024-07-04 15:04:08 +02:00
Vlad Lazar
a46253766b pageserver: increase rate limit duration for layer visit log (#8263)
## Problem
I'd like to keep this in the tree since it might be useful in prod as
well. It's a bit too noisy as is and missing the lsn.

## Summary of changes
Add an lsn field and and increase the rate limit duration.
2024-07-04 13:22:33 +01:00
Alexander Bayandin
5b69b32dc5 CI(build-and-test): add conclusion job (#8246)
## Problem

Currently, if you need to rename a job and the job is listed in [branch
protection
rules](https://github.com/neondatabase/neon/settings/branch_protection_rules),
the PR won't be allowed to merge.

## Summary of changes
- Add `conclusion` job that fails if any of its dependencies don't
finish successfully
2024-07-04 09:20:01 +01:00
Conrad Ludgate
e03c3c9893 proxy: cache certain non-retriable console errors for a short time (#8201)
## Problem

If there's a quota error, it makes sense to cache it for a short window
of time. Many clients do not handle database connection errors
gracefully, so just spam retry 🤡

## Summary of changes

Updates the node_info cache to support storing console errors. Store
console errors if they cannot be retried (using our own heuristic.
should only trigger for quota exceeded errors).
2024-07-04 09:03:03 +01:00
Vlad Lazar
bbb2fa7cdd tests: perform graceful rolling restarts in storcon scale test (#8173)
## Problem
Scale test doesn't exercise drain & fill.

## Summary of changes
Make scale test exercise drain & fill
2024-07-04 06:04:19 +01:00
John Spray
778787d8e9 pageserver: add supplementary branch usage stats (#8131)
## Problem

The metrics we have today aren't convenient for planning around the
impact of timeline archival on costs.

Closes: https://github.com/neondatabase/neon/issues/8108

## Summary of changes

- Add metric `pageserver_archive_size`, which indicates the logical
bytes of data which we would expect to write into an archived branch.
- Add metric `pageserver_pitr_history_size`, which indicates the
distance between last_record_lsn and the PITR cutoff.

These metrics are somewhat temporary: when we implement #8088 and
associated consumption metric changes, these will reach a final form.
For now, an "archived" branch is just any branch outside of its parent's
PITR window: later, archival will become an explicit state (which will
_usually_ correspond to falling outside the parent's PITR window).

The overall volume of timeline metrics is something to watch, but we are
removing many more in https://github.com/neondatabase/neon/pull/8245
than this PR is adding.
2024-07-03 22:29:43 +01:00
Alex Chi Z
90b51dcf16 fix(pageserver): ensure test creates valid layer map (#8191)
I'd like to add some constraints to the layer map we generate in tests.

(1) is the layer map that the current compaction algorithm will produce.
There is a property that for all delta layer, all delta layer overlaps
with it on the LSN axis will have the same LSN range.
(2) is the layer map that cannot be produced with the legacy compaction
algorithm.
(3) is the layer map that will be produced by the future
tiered-compaction algorithm. The current validator does not allow that
but we can modify the algorithm to allow it in the future.

## Summary of changes

Add a validator to check if the layer map is valid and refactor the test
cases to include delta layer start/end LSN.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-07-03 18:46:58 +00:00
Christian Schwarz
a85aa03d18 page_service: stop exposing get_last_record_rlsn (#8244)
Compute doesn't use it, let's eliminate it.

Ref to Slack thread:
https://neondb.slack.com/archives/C033RQ5SPDH/p1719920261995529
2024-07-03 20:05:01 +02:00
Japin Li
cdaed4d79c Fix outdated comment (#8149)
Commit 97b48c23f changes the log wait timeout from 1 second to 100
milliseconds but forgets to update the comment.
2024-07-03 13:55:36 -04:00
John Spray
ea0b22a9b0 pageserver: reduce ops tracked at per-timeline detail (#8245)
## Problem

We record detailed histograms for all page_service op types, which
mostly aren't very interesting, but make our prometheus scrapes huge.

Closes: #8223 

## Summary of changes

- Only track GetPageAtLsn histograms on a per-timeline granularity. For
all other operation types, rely on existing node-wide histograms.
2024-07-03 17:27:34 +01:00
Peter Bendel
392a58bdce add pagebench test cases for periodic pagebench on dedicated hardware (#8233)
we want to run some specific pagebench test cases on dedicated hardware
to get reproducible results

run1: 1 client per tenant => characterize throughput with n tenants.
-  500 tenants
- scale 13 (200 MB database)
- 1 hour duration
- ca 380 GB layer snapshot files

run2.singleclient: 1 client per tenant => characterize latencies
run2.manyclient: N clients per tenant => characterize throughput
scalability within one tenant.
- 1 tenant with 1 client for latencies
- 1 tenant with 64 clients because typically for a high number of
connections we recommend the connection pooler
which by default uses 64 connections (for scalability)
- scale 136 (2048 MB database)
- 20 minutes each
2024-07-03 16:22:33 +00:00
Arpad Müller
e0891ec8c8 Only support compressed reads if the compression setting is present (#8238)
PR #8106 was created with the assumption that no blob is larger than
`256 MiB`. Due to #7852 we have checking for *writes* of blobs larger
than that limit, but we didn't have checking for *reads* of such large
blobs: in theory, we could be reading these blobs every day but we just
don't happen to write the blobs for some reason.

Therefore, we now add a warning for *reads* of such large blobs as well.

To make deploying compression less dangerous, we therefore only assume a
blob is compressed if the compression setting is present in the config.
This also means that we can't back out of compression once we enabled
it.

Part of https://github.com/neondatabase/neon/issues/5431
2024-07-03 18:02:10 +02:00
John Spray
97f7188a07 pageserver: don't try to flush if shutdown during attach (#8235)
## Problem

test_location_conf_churn fails on log errors when it tries to shutdown a
pageserver immediately after starting a tenant attach, like this:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8224/9761000525/index.html#/testresult/15fb6beca5c7327c

```
shutdown:shutdown{tenant_id=35f5c55eb34e7e5e12288c5d8ab8b909 shard_id=0000}:timeline_shutdown{timeline_id=30936747043353a98661735ad09cbbfe shutdown_mode=FreezeAndFlush}: failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited\n')
```

This is happening because Tenant::shutdown fires its cancellation token
early if the tenant is not fully attached by the time shutdown is
called, so the flush loop is shutdown by the time we try and flush.

## Summary of changes

- In the early-cancellation case, also set the shutdown mode to Hard to
skip trying to do a flush that will fail.
2024-07-03 13:13:06 +00:00
Alexander Bayandin
aae3876318 CI: update docker/* actions to latest versions (#7694)
## Problem

GitHub Actions complain that we use actions that depend on deprecated
Node 16:

```
Node.js 16 actions are deprecated. Please update the following actions to use Node.js 20: docker/setup-buildx-action@v2
```

But also, the latest `docker/setup-buildx-action` fails with the following
error:
```
/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:175
            throw new Error(`Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.`);
^
Error: Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.
    at Object.rejected (/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:175:1)
    at Generator.next (<anonymous>)
    at fulfilled (/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:29:1)
```

We can work this around by setting `cache-binary: false` for `uses:
docker/setup-buildx-action@v3`

## Summary of changes
- Update `docker/setup-buildx-action` from `v2` to `v3`, set
`cache-binary: false`
- Update `docker/login-action` from `v2` to `v3`
- Update `docker/build-push-action` from `v4`/`v5` to `v6`
2024-07-03 12:19:13 +01:00
Heikki Linnakangas
dae55badf3 Simplify test_wal_page_boundary_start test (#8214)
All the code to ensure the WAL record lands at a page boundary was
unnecessary for reproducing the original problem. In fact, it's a pretty
basic test that checks that outbound replication (= neon as publisher)
still works after restarting the endpoint. It just used to be very
broken before commit 5ceccdc7de, which also added this test.

To verify that:

1. Check out commit f3af5f4660 (because the next commit, 7dd58e1449,
fixed the same bug in a different way, making it infeasible to revert
the bug fix in an easy way)
2. Revert the bug fix from commit 5ceccdc7de with this:

```
diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c
index 7debb6325..9f03bbd99 100644
--- a/pgxn/neon/walproposer_pg.c
+++ b/pgxn/neon/walproposer_pg.c
@@ -1437,8 +1437,10 @@ XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr)
 	 *
 	 * https://github.com/neondatabase/neon/issues/5749
 	 */
+#if 0
 	if (!wp->config->syncSafekeepers)
 		XLogUpdateWalBuffers(buf, recptr, nbytes);
+#endif

 	while (nbytes > 0)
 	{
```

3. Run the test_wal_page_boundary_start regression test. It fails, as
expected

4. Apply this commit to the test, and run it again. It still fails, with
the same error mentioned in issue #5749:

```
PG:2024-06-30 20:49:08.805 GMT [1248196] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.567 GMT [1467972] LOG:  starting logical decoding for slot "sub1"
PG:2024-06-30 21:37:52.567 GMT [1467972] DETAIL:  Streaming transactions committing after 0/1532330, reading WAL from 0/1531C78.
PG:2024-06-30 21:37:52.567 GMT [1467972] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.567 GMT [1467972] LOG:  logical decoding found consistent point at 0/1531C78
PG:2024-06-30 21:37:52.567 GMT [1467972] DETAIL:  There are no running transactions.
PG:2024-06-30 21:37:52.567 GMT [1467972] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.568 GMT [1467972] ERROR:  could not find record while sending logically-decoded data: invalid contrecord length 312 (expected 6) at 0/1533FD8
```
2024-07-03 13:22:53 +03:00
Alex Chi Z
4273309962 docker: add storage_scrubber into the docker image (#8239)
## Problem

We will run this tool in the k8s cluster. To make it accessible from
k8s, we need to package it into the docker image.

part of https://github.com/neondatabase/cloud/issues/14024
2024-07-03 09:48:56 +01:00
Konstantin Knizhnik
4a0c2aebe0 Add test for proper handling of connection failure to avoid 'cannot wait on socket event without a socket' error (#8231)
## Problem

See https://github.com/neondatabase/cloud/issues/14289
and PR #8210 

## Summary of changes

Add test for problems fixed in #8210

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-02 21:45:42 +03:00
Alex Chi Z
891cb5a9a8 fix(pageserver): comments about metadata key range (#8236)
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-07-02 16:54:32 +00:00
John Spray
f5832329ac tense of errors (#8234)
I forgot a commit when merging
https://github.com/neondatabase/neon/pull/8177
2024-07-02 17:17:22 +01:00
Alexander Bayandin
6216df7765 CI(benchmarking): move psql queries to actions/run-python-test-set (#8230)
## Problem

Some of the Nightly benchmarks fail with the error
```
+ /tmp/neon/pg_install/v14/bin/pgbench --version
/tmp/neon/pg_install/v14/bin/pgbench: error while loading shared libraries: libpq.so.5: cannot open shared object file: No such file or directory
```
Originally, we added the `pgbench --version` call to check that
`pgbench` is installed and to fail earlier if it's not.
The failure happens because we don't have `LD_LIBRARY_PATH` set for
every job, and it also affects `psql` command.
We can move it to `actions/run-python-test-set` so as not to duplicate
code (as it already have `LD_LIBRARY_PATH` set).

## Summary of changes
- Remove `pgbench --version` call
- Move `psql` commands to common `actions/run-python-test-set`
2024-07-02 15:21:23 +00:00
Christian Schwarz
5de896e7d8 L0 flush: opt-in mechanism to bypass PageCache reads and writes (#8190)
part of https://github.com/neondatabase/neon/issues/7418

# Motivation

(reproducing #7418)

When we do an `InMemoryLayer::write_to_disk`, there is a tremendous
amount of random read I/O, as deltas from the ephemeral file (written in
LSN order) are written out to the delta layer in key order.

In benchmarks (https://github.com/neondatabase/neon/pull/7409) we can
see that this delta layer writing phase is substantially more expensive
than the initial ingest of data, and that within the delta layer write a
significant amount of the CPU time is spent traversing the page cache.

# High-Level Changes

Add a new mode for L0 flush that works as follows:

* Read the full ephemeral file into memory -- layers are much smaller
than total memory, so this is afforable
* Do all the random reads directly from this in memory buffer instead of
using blob IO/page cache/disk reads.
* Add a semaphore to limit how many timelines may concurrently do this
(limit peak memory).
* Make the semaphore configurable via PS config.

# Implementation Details

The new `BlobReaderRef::Slice` is a temporary hack until we can ditch
`blob_io` for `InMemoryLayer` => Plan for this is laid out in
https://github.com/neondatabase/neon/issues/8183

# Correctness

The correctness of this change is quite obvious to me: we do what we did
before (`blob_io`) but read from memory instead of going to disk.

The highest bug potential is in doing owned-buffers IO. I refactored the
API a bit in preliminary PR
https://github.com/neondatabase/neon/pull/8186 to make it less
error-prone, but still, careful review is requested.

# Performance

I manually measured single-client ingest performance from `pgbench -i
...`.

Full report:
https://neondatabase.notion.site/2024-06-28-benchmarking-l0-flush-performance-e98cff3807f94cb38f2054d8c818fe84?pvs=4

tl;dr:

* no speed improvements during ingest,  but
* significantly lower pressure on PS PageCache (eviction rate drops to
1/3)
  * (that's why I'm working on this)
* noticable but modestly lower CPU time

This is good enough for merging this PR because the changes require
opt-in.

We'll do more testing in staging & pre-prod.

# Stability / Monitoring

**memory consumption**: there's no _hard_ limit on max `InMemoryLayer`
size (aka "checkpoint distance") , hence there's no hard limit on the
memory allocation we do for flushing. In practice, we a) [log a
warning](23827c6b0d/pageserver/src/tenant/timeline.rs (L5741-L5743))
when we flush oversized layers, so we'd know which tenant is to blame
and b) if we were to put a hard limit in place, we would have to decide
what to do if there is an InMemoryLayer that exceeds the limit.
It seems like a better option to guarantee a max size for frozen layer,
dependent on `checkpoint_distance`. Then limit concurrency based on
that.

**metrics**: we do have the
[flush_time_histo](23827c6b0d/pageserver/src/tenant/timeline.rs (L3725-L3726)),
but that includes the wait time for the semaphore. We could add a
separate metric for the time spent after acquiring the semaphore, so one
can infer the wait time. Seems unnecessary at this point, though.
2024-07-02 16:29:09 +02:00
Arpad Müller
25eefdeb1f Add support for reading and writing compressed blobs (#8106)
Add support for reading and writing zstd-compressed blobs for use in
image layer generation, but maybe one day useful also for delta layers.
The reading of them is unconditional while the writing is controlled by
the `image_compression` config variable allowing for experiments.

For the on-disk format, we re-use some of the bitpatterns we currently
keep reserved for blobs larger than 256 MiB. This assumes that we have
never ever written any such large blobs to image layers.

After the preparation in #7852, we now are unable to read blobs with a
size larger than 256 MiB (or write them).

A non-goal of this PR is to come up with good heuristics of when to
compress a bitpattern. This is left for future work.

Parts of the PR were inspired by #7091.

cc  #7879

Part of #5431
2024-07-02 14:14:12 +00:00
Vlad Lazar
28929d9cfa pageserver: rate limit log for loads of layers visited (#8228)
## Problem
At high percentiles we see more than 800 layers being visited by the
read path. We need the tenant/timeline to investigate.

## Summary of changes
Add a rate limited log line when the average number of layers visited
per key is in the last specified histogram bucket.
I plan to use this to identify tenants in us-east-2 staging that exhibit
this behaviour. Will revert before next week's release.
2024-07-02 14:14:10 +01:00
Christian Schwarz
9b4b4bbf6f fix: noisy logging when download gets cancelled during shutdown (#8224)
Before this PR, during timeline shutdown, we'd occasionally see
log lines like this one:

```
2024-06-26T18:28:11.063402Z  INFO initial_size_calculation{tenant_id=$TENANT,shard_id=0000 timeline_id=$TIMELINE}:logical_size_calculation_task:get_or_maybe_download{layer=000000000000000000000000000000000000-000000067F0001A3950001C1630100000000__0000000D88265898}: layer file download failed, and caller has been cancelled: Cancelled, shutting down
Stack backtrace:
   0: <core::result::Result<T,F> as core::ops::try_trait::FromResidual<core::result::Result<core::convert::Infallible,E>>>::from_residual
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/result.rs:1964:27
      pageserver::tenant::remote_timeline_client::RemoteTimelineClient::download_layer_file::{{closure}}
             at /home/nonroot/pageserver/src/tenant/remote_timeline_client.rs:531:13
      pageserver::tenant::storage_layer::layer::LayerInner::download_and_init::{{closure}}
             at /home/nonroot/pageserver/src/tenant/storage_layer/layer.rs:1136:14
      pageserver::tenant::storage_layer::layer::LayerInner::download_init_and_wait::{{closure}}::{{closure}}
             at /home/nonroot/pageserver/src/tenant/storage_layer/layer.rs:1082:74
```

We can eliminate the anyhow backtrace with no loss of information
because the conversion to anyhow::Error happens in exactly one place.

refs #7427
2024-07-02 13:13:27 +00:00
John Spray
1a0f545c16 pageserver: simpler, stricter config error handling (#8177)
## Problem

Tenant attachment has error paths for failures to write local
configuration, but these types of local storage I/O errors should be
considered fatal for the process. Related thread on an earlier PR that
touched this code:
https://github.com/neondatabase/neon/pull/7947#discussion_r1655134114

## Summary of changes

- Make errors writing tenant config fatal (abort process)
- When reading tenant config, make all I/O errors except ENOENT fatal
- Replace use of bare anyhow errors with `LoadConfigError`
2024-07-02 12:45:04 +00:00
Christian Schwarz
7dcdbaa25e remote_storage config: move handling of empty inline table {} to callers (#8193)
Before this PR, `RemoteStorageConfig::from_toml` would support
deserializing an
empty `{}` TOML inline table to a `None`, otherwise try `Some()`.

We can instead let
* in proxy: let clap derive handle the Option
* in PS & SK: assume that if the field is specified, it must be a valid
  RemtoeStorageConfig

(This PR started with a much simpler goal of factoring out the
`deserialize_item` function because I need that in another PR).
2024-07-02 12:53:08 +02:00
Konstantin Knizhnik
0497b99f3a Check status of connection after PQconnectStartParams (#8210)
## Problem

See https://github.com/neondatabase/cloud/issues/14289

## Summary of changes

Check connection status after calling PQconnectStartParams

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-02 06:56:10 +03:00
Vlad Lazar
9882ac8e06 docs: Graceful storage controller cluster restarts RFC (#7704)
RFC for "Graceful Restarts of Storage Controller Managed Clusters". 
Related https://github.com/neondatabase/neon/issues/7387
2024-07-01 18:44:28 +01:00
Heikki Linnakangas
0789160ffa tests: Make neon_xlogflush() flush all WAL, if you omit the LSN arg (#8215)
This makes it much more convenient to use in the common case that you
want to flush all the WAL. (Passing pg_current_wal_insert_lsn() as the
argument doesn't work for the same reasons as explained in the comments:
we need to be back off to the beginning of a page if the previous record
ended at page boundary.)

I plan to use this to fix the issue that Arseny Sher called out at
https://github.com/neondatabase/neon/pull/7288#discussion_r1660063852
2024-07-01 10:55:18 -05:00
Alexander Bayandin
9c32604aa9 CI(gather-rust-build-stats): fix build with libpq (#8219)
## Problem
I've missed setting `PQ_LIB_DIR` in
https://github.com/neondatabase/neon/pull/8206 in
`gather-rust-build-stats` job and it fails now:
```
  = note: /usr/bin/ld: cannot find -lpq
          collect2: error: ld returned 1 exit status
          

error: could not compile `storage_controller` (bin "storage_controller") due to 1 previous error
```

https://github.com/neondatabase/neon/actions/runs/9743960062/job/26888597735

## Summary of changes
- Set `PQ_LIB_DIR` for `gather-rust-build-stats` job
2024-07-01 16:42:23 +01:00
Alex Chi Z
b02aafdfda fix(pageserver): include aux file in basebackup only once (#8207)
Extracted from https://github.com/neondatabase/neon/pull/6560, currently
we include multiple copies of aux files in the basebackup.

## Summary of changes

Fix the loop.

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-01 14:36:49 +00:00
Alexander Bayandin
e823b92947 CI(build-tools): Remove libpq from build image (#8206)
## Problem
We use `build-tools` image as a base image to build other images, and it
has a pretty old `libpq-dev` installed (v13; it wasn't that old until I
removed system Postgres 14 from `build-tools` image in
https://github.com/neondatabase/neon/pull/6540)

## Summary of changes
- Remove `libpq-dev` from `build-tools` image
- Set `LD_LIBRARY_PATH` for tests (for different Postgres binaries that
we use, like psql and pgbench)
- Set `PQ_LIB_DIR` to build Storage Controller
- Set `LD_LIBRARY_PATH`/`DYLD_LIBRARY_PATH` in the Storage Controller
where it calls Postgres binaries
2024-07-01 13:11:55 +01:00
John Spray
aea5cfe21e pageserver: add metric pageserver_secondary_resident_physical_size (#8204)
## Problem

We lack visibility of how much local disk space is used by secondary
tenant locations

Close: https://github.com/neondatabase/neon/issues/8181

## Summary of changes

- Add `pageserver_secondary_resident_physical_size`, tagged by tenant
- Register & de-register label sets from SecondaryTenant
- Add+use wrappers in SecondaryDetail that update metrics when
adding+removing layers/timelines
2024-07-01 12:48:20 +01:00
Heikki Linnakangas
9ce193082a Restore running xacts from CLOG on replica startup (#7288)
We have one pretty serious MVCC visibility bug with hot standby
replicas. We incorrectly treat any transactions that are in progress
in the primary, when the standby is started, as aborted. That can
break MVCC for queries running concurrently in the standby. It can
also lead to hint bits being set incorrectly, and that damage can last
until the replica is restarted.

The fundamental bug was that we treated any replica start as starting
from a shut down server. The fix for that is straightforward: we need
to set 'wasShutdown = false' in InitWalRecovery() (see changes in the
postgres repo).

However, that introduces a new problem: with wasShutdown = false, the
standby will not open up for queries until it receives a running-xacts
WAL record from the primary. That's correct, and that's how Postgres
hot standby always works. But it's a problem for Neon, because:

* It changes the historical behavior for existing users. Currently,
  the standby immediately opens up for queries, so if they now need to
  wait, we can breka existing use cases that were working fine
  (assuming you don't hit the MVCC issues).

* The problem is much worse for Neon than it is for standalone
  PostgreSQL, because in Neon, we can start a replica from an
  arbitrary LSN. In standalone PostgreSQL, the replica always starts
  WAL replay from a checkpoint record, and the primary arranges things
  so that there is always a running-xacts record soon after each
  checkpoint record. You can still hit this issue with PostgreSQL if
  you have a transaction with lots of subtransactions running in the
  primary, but it's pretty rare in practice.

To mitigate that, we introduce another way to collect the
running-xacts information at startup, without waiting for the
running-xacts WAL record: We can the CLOG for XIDs that haven't been
marked as committed or aborted. It has limitations with
subtransactions too, but should mitigate the problem for most users.

See https://github.com/neondatabase/neon/issues/7236.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-01 12:58:12 +03:00
Heikki Linnakangas
75c84c846a tests: Make neon_xlogflush() flush all WAL, if you omit the LSN arg
This makes it much more convenient to use in the common case that you
want to flush all the WAL. (Passing pg_current_wal_insert_lsn() as the
argument doesn't work for the same reasons as explained in the
comments: we need to be back off to the beginning of a page if the
previous record ended at page boundary.)

I plan to use this to fix the issue that Arseny Sher called out at
https://github.com/neondatabase/neon/pull/7288#discussion_r1660063852
2024-07-01 12:58:08 +03:00
Heikki Linnakangas
57535c039c tests: remove a leftover 'running' flag (#8216)
The 'running' boolean was replaced with a semaphore in commit
f0e2bb79b2, but this initialization was missed. Remove it so that if a
test tries to access it, you get an error rather than always claiming
that the endpoint is not running.

Spotted by Arseny at
https://github.com/neondatabase/neon/pull/7288#discussion_r1660068657
2024-07-01 11:23:31 +03:00
29 changed files with 176 additions and 771 deletions

View File

@@ -43,10 +43,6 @@ jobs:
AWS_DEFAULT_REGION : "eu-central-1"
AWS_INSTANCE_ID : "i-02a59a3bf86bc7e74"
steps:
# we don't need the neon source code because we run everything remotely
# however we still need the local github actions to run the allure step below
- uses: actions/checkout@v4
- name: Show my own (github runner) external IP address - usefull for IP allowlisting
run: curl https://ifconfig.me
@@ -94,12 +90,10 @@ jobs:
set +x
status=$(echo $response | jq -r '.status')
echo "Test status: $status"
if [[ "$status" == "failure" ]]; then
echo "Test failed"
exit 1 # Fail the job step if status is failure
elif [[ "$status" == "success" || "$status" == "null" ]]; then
if [[ "$status" == "failure" || "$status" == "success" || "$status" == "null" ]]; then
break
elif [[ "$status" == "too_many_runs" ]]; then
fi
if [[ "$status" == "too_many_runs" ]]; then
echo "Too many runs already running"
echo "too_many_runs=true" >> "$GITHUB_OUTPUT"
exit 1
@@ -109,7 +103,6 @@ jobs:
done
- name: Retrieve Test Logs
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
curl -k -X 'GET' \
"${EC2_MACHINE_URL_US}/test_log/${GITHUB_RUN_ID}" \
@@ -118,15 +111,11 @@ jobs:
--output "test_log_${GITHUB_RUN_ID}.gz"
- name: Unzip Test Log and Print it into this job's log
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
gzip -d "test_log_${GITHUB_RUN_ID}.gz"
cat "test_log_${GITHUB_RUN_ID}"
- name: Create Allure report
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate

View File

@@ -9,7 +9,6 @@ use std::{
collections::HashMap,
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
sync::atomic::AtomicUsize,
time::{Duration, SystemTime},
};
@@ -438,7 +437,18 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
/// Disabled for writes, and never decompress during reading.
/// Never set this after you've enabled compression once!
@@ -458,31 +468,6 @@ impl ImageCompressionAlgorithm {
}
}
impl FromStr for ImageCompressionAlgorithm {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut components = s.split(['(', ')']);
let first = components
.next()
.ok_or_else(|| anyhow::anyhow!("empty string"))?;
match first {
"disabled-no-decompress" => Ok(ImageCompressionAlgorithm::DisabledNoDecompress),
"disabled" => Ok(ImageCompressionAlgorithm::Disabled),
"zstd" => {
let level = if let Some(v) = components.next() {
let v: i8 = v.parse()?;
Some(v)
} else {
None
};
Ok(ImageCompressionAlgorithm::Zstd { level })
}
_ => anyhow::bail!("invalid specifier '{first}'"),
}
}
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,
@@ -1675,29 +1660,4 @@ mod tests {
AuxFilePolicy::CrossValidation
);
}
#[test]
fn test_image_compression_algorithm_parsing() {
use ImageCompressionAlgorithm::*;
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
Disabled
);
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled-no-decompress").unwrap(),
DisabledNoDecompress
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
Zstd { level: None }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
Zstd { level: Some(18) }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
Zstd { level: Some(-3) }
);
}
}

View File

@@ -1456,12 +1456,10 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
}
}
pub(crate) static LIVE_CONNECTIONS: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
"pageserver_live_connections_started",
"Number of network connections that we started handling",
"pageserver_live_connections_finished",
"Number of network connections that we finished handling",
pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_live_connections",
"Number of live network connections",
&["pageserver_connection_kind"]
)
.expect("failed to define a metric")

View File

@@ -55,7 +55,7 @@ use crate::basebackup::BasebackupError;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics;
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS_COUNT};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
@@ -215,9 +215,14 @@ async fn page_service_conn_main(
auth_type: AuthType,
connection_ctx: RequestContext,
) -> anyhow::Result<()> {
let _guard = LIVE_CONNECTIONS
.with_label_values(&["page_service"])
.guard();
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
socket
.set_nodelay(true)

View File

@@ -365,7 +365,6 @@ pub struct Timeline {
repartition_threshold: u64,
last_image_layer_creation_check_at: AtomicLsn,
last_image_layer_creation_check_instant: std::sync::Mutex<Option<Instant>>,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
@@ -2385,7 +2384,6 @@ impl Timeline {
)),
repartition_threshold: 0,
last_image_layer_creation_check_at: AtomicLsn::new(0),
last_image_layer_creation_check_instant: Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(RelSizeCache {
@@ -4466,58 +4464,6 @@ impl Timeline {
}
}
/// Predicate function which indicates whether we should check if new image layers
/// are required. Since checking if new image layers are required is expensive in
/// terms of CPU, we only do it in the following cases:
/// 1. If the timeline has ingested sufficient WAL to justify the cost
/// 2. If enough time has passed since the last check
/// 2.1. For large tenants, we wish to perform the check more often since they
/// suffer from the lack of image layers
/// 2.2. For small tenants (that can mostly fit in RAM), we use a much longer interval
fn should_check_if_image_layers_required(self: &Arc<Timeline>, lsn: Lsn) -> bool {
const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
let last_checks_at = self.last_image_layer_creation_check_at.load();
let distance = lsn
.checked_sub(last_checks_at)
.expect("Attempt to compact with LSN going backwards");
let min_distance =
self.get_image_layer_creation_check_threshold() as u64 * self.get_checkpoint_distance();
let distance_based_decision = distance.0 >= min_distance;
let mut time_based_decision = false;
let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap();
if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() {
let check_required_after = if Into::<u64>::into(&logical_size) >= LARGE_TENANT_THRESHOLD
{
self.get_checkpoint_timeout()
} else {
Duration::from_secs(3600 * 48)
};
time_based_decision = match *last_check_instant {
Some(last_check) => {
let elapsed = last_check.elapsed();
elapsed >= check_required_after
}
None => true,
};
}
// Do the expensive delta layer counting only if this timeline has ingested sufficient
// WAL since the last check or a checkpoint timeout interval has elapsed since the last
// check.
let decision = distance_based_decision || time_based_decision;
if decision {
self.last_image_layer_creation_check_at.store(lsn);
*last_check_instant = Some(Instant::now());
}
decision
}
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
async fn create_image_layers(
self: &Arc<Timeline>,
@@ -4540,7 +4486,22 @@ impl Timeline {
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
let mut start = Key::MIN;
let check_for_image_layers = self.should_check_if_image_layers_required(lsn);
let check_for_image_layers = {
let last_checks_at = self.last_image_layer_creation_check_at.load();
let distance = lsn
.checked_sub(last_checks_at)
.expect("Attempt to compact with LSN going backwards");
let min_distance = self.get_image_layer_creation_check_threshold() as u64
* self.get_checkpoint_distance();
// Skip the expensive delta layer counting if this timeline has not ingested sufficient
// WAL since the last check.
distance.0 >= min_distance
};
if check_for_image_layers {
self.last_image_layer_creation_check_at.store(lsn);
}
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;

View File

@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
@@ -208,9 +208,14 @@ pub(super) async fn handle_walreceiver_connection(
.instrument(tracing::info_span!("poller")),
);
let _guard = LIVE_CONNECTIONS
.with_label_values(&["wal_receiver"])
.guard();
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
let identify = identify_system(&replication_client).await?;
info!("{identify:?}");

View File

@@ -6,7 +6,6 @@ OBJS = \
$(WIN32RES) \
extension_server.o \
file_cache.o \
hll.o \
libpagestore.o \
neon.o \
neon_utils.o \
@@ -23,7 +22,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -26,6 +26,7 @@
#include "miscadmin.h"
#include "pagestore_client.h"
#include "common/hashfn.h"
#include "lib/hyperloglog.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include RELFILEINFO_HDR
@@ -39,8 +40,6 @@
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "hll.h"
/*
* Local file cache is used to temporary store relations pages in local file system.
* All blocks of all relations are stored inside one file and addressed using shared hash map.
@@ -63,6 +62,7 @@
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
#define MB ((uint64)1024*1024)
#define HYPER_LOG_LOG_BIT_WIDTH 10
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
typedef struct FileCacheEntry
@@ -87,7 +87,8 @@ typedef struct FileCacheControl
uint64 writes;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
HyperLogLogState wss_estimation; /* estimation of working set size */
hyperLogLogState wss_estimation; /* estimation of wroking set size */
uint8_t hyperloglog_hashes[(1 << HYPER_LOG_LOG_BIT_WIDTH) + 1];
} FileCacheControl;
static HTAB *lfc_hash;
@@ -237,7 +238,12 @@ lfc_shmem_startup(void)
dlist_init(&lfc_ctl->lru);
/* Initialize hyper-log-log structure for estimating working set size */
initSHLL(&lfc_ctl->wss_estimation);
initHyperLogLog(&lfc_ctl->wss_estimation, HYPER_LOG_LOG_BIT_WIDTH);
/* We need hashes in shared memory */
pfree(lfc_ctl->wss_estimation.hashesArr);
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
lfc_ctl->wss_estimation.hashesArr = lfc_ctl->hyperloglog_hashes;
/* Recreate file cache on restart */
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
@@ -539,7 +545,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Approximate working set */
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
addHyperLogLog(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
{
@@ -980,38 +986,20 @@ local_cache_pages(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds);
Datum
approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
{
if (lfc_size_limit != 0)
{
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(approximate_working_set_size);
Datum
approximate_working_set_size(PG_FUNCTION_ARGS)
{
int32 dc = -1;
if (lfc_size_limit != 0)
{
int32 dc;
bool reset = PG_GETARG_BOOL(0);
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1);
dc = (int32) estimateHyperLogLog(&lfc_ctl->wss_estimation);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
PG_RETURN_INT32(dc);
}

View File

@@ -1,193 +0,0 @@
/*-------------------------------------------------------------------------
*
* hll.c
* Sliding HyperLogLog cardinality estimator
*
* Portions Copyright (c) 2014-2023, PostgreSQL Global Development Group
*
* Implements https://hal.science/hal-00465313/document
*
* Based on Hideaki Ohno's C++ implementation. This is probably not ideally
* suited to estimating the cardinality of very large sets; in particular, we
* have not attempted to further optimize the implementation as described in
* the Heule, Nunkesser and Hall paper "HyperLogLog in Practice: Algorithmic
* Engineering of a State of The Art Cardinality Estimation Algorithm".
*
* A sparse representation of HyperLogLog state is used, with fixed space
* overhead.
*
* The copyright terms of Ohno's original version (the MIT license) follow.
*
* IDENTIFICATION
* src/backend/lib/hyperloglog.c
*
*-------------------------------------------------------------------------
*/
/*
* Copyright (c) 2013 Hideaki Ohno <hide.o.j55{at}gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the 'Software'), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include <math.h>
#include "postgres.h"
#include "funcapi.h"
#include "port/pg_bitutils.h"
#include "utils/timestamp.h"
#include "hll.h"
#define POW_2_32 (4294967296.0)
#define NEG_POW_2_32 (-4294967296.0)
#define ALPHA_MM ((0.7213 / (1.0 + 1.079 / HLL_N_REGISTERS)) * HLL_N_REGISTERS * HLL_N_REGISTERS)
/*
* Worker for addHyperLogLog().
*
* Calculates the position of the first set bit in first b bits of x argument
* starting from the first, reading from most significant to least significant
* bits.
*
* Example (when considering fist 10 bits of x):
*
* rho(x = 0b1000000000) returns 1
* rho(x = 0b0010000000) returns 3
* rho(x = 0b0000000000) returns b + 1
*
* "The binary address determined by the first b bits of x"
*
* Return value "j" used to index bit pattern to watch.
*/
static inline uint8
rho(uint32 x, uint8 b)
{
uint8 j = 1;
if (x == 0)
return b + 1;
j = 32 - pg_leftmost_one_pos32(x);
if (j > b)
return b + 1;
return j;
}
/*
* Initialize HyperLogLog track state
*/
void
initSHLL(HyperLogLogState *cState)
{
memset(cState->regs, 0, sizeof(cState->regs));
}
/*
* Adds element to the estimator, from caller-supplied hash.
*
* It is critical that the hash value passed be an actual hash value, typically
* generated using hash_any(). The algorithm relies on a specific bit-pattern
* observable in conjunction with stochastic averaging. There must be a
* uniform distribution of bits in hash values for each distinct original value
* observed.
*/
void
addSHLL(HyperLogLogState *cState, uint32 hash)
{
uint8 count;
uint32 index;
size_t i;
size_t j;
TimestampTz now = GetCurrentTimestamp();
/* Use the first "k" (registerWidth) bits as a zero based index */
index = hash >> HLL_C_BITS;
/* Compute the rank of the remaining 32 - "k" (registerWidth) bits */
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS);
cState->regs[index][count] = now;
}
static uint8
getMaximum(const TimestampTz* reg, TimestampTz since)
{
uint8 max = 0;
for (size_t i = 0; i < HLL_C_BITS + 1; i++)
{
if (reg[i] >= since)
{
max = i;
}
}
return max;
}
/*
* Estimates cardinality, based on elements added so far
*/
double
estimateSHLL(HyperLogLogState *cState, time_t duration)
{
double result;
double sum = 0.0;
size_t i;
uint8 R[HLL_N_REGISTERS];
/* 0 indicates uninitialized timestamp, so if we need to cover the whole range than starts with 1 */
TimestampTz since = duration == (time_t)-1 ? 1 : GetCurrentTimestamp() - duration * USECS_PER_SEC;
for (i = 0; i < HLL_N_REGISTERS; i++)
{
R[i] = getMaximum(cState->regs[i], since);
sum += 1.0 / pow(2.0, R[i]);
}
/* result set to "raw" HyperLogLog estimate (E in the HyperLogLog paper) */
result = ALPHA_MM / sum;
if (result <= (5.0 / 2.0) * HLL_N_REGISTERS)
{
/* Small range correction */
int zero_count = 0;
for (i = 0; i < HLL_N_REGISTERS; i++)
{
zero_count += R[i] == 0;
}
if (zero_count != 0)
result = HLL_N_REGISTERS * log((double) HLL_N_REGISTERS /
zero_count);
}
else if (result > (1.0 / 30.0) * POW_2_32)
{
/* Large range correction */
result = NEG_POW_2_32 * log(1.0 - (result / POW_2_32));
}
return result;
}

View File

@@ -1,86 +0,0 @@
/*-------------------------------------------------------------------------
*
* hll.h
* Sliding HyperLogLog cardinality estimator
*
* Portions Copyright (c) 2014-2023, PostgreSQL Global Development Group
*
* Implements https://hal.science/hal-00465313/document
*
* Based on Hideaki Ohno's C++ implementation. This is probably not ideally
* suited to estimating the cardinality of very large sets; in particular, we
* have not attempted to further optimize the implementation as described in
* the Heule, Nunkesser and Hall paper "HyperLogLog in Practice: Algorithmic
* Engineering of a State of The Art Cardinality Estimation Algorithm".
*
* A sparse representation of HyperLogLog state is used, with fixed space
* overhead.
*
* The copyright terms of Ohno's original version (the MIT license) follow.
*
* IDENTIFICATION
* src/backend/lib/hyperloglog.c
*
*-------------------------------------------------------------------------
*/
/*
* Copyright (c) 2013 Hideaki Ohno <hide.o.j55{at}gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the 'Software'), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#ifndef HLL_H
#define HLL_H
#define HLL_BIT_WIDTH 10
#define HLL_C_BITS (32 - HLL_BIT_WIDTH)
#define HLL_N_REGISTERS (1 << HLL_BIT_WIDTH)
/*
* HyperLogLog is an approximate technique for computing the number of distinct
* entries in a set. Importantly, it does this by using a fixed amount of
* memory. See the 2007 paper "HyperLogLog: the analysis of a near-optimal
* cardinality estimation algorithm" for more.
*
* Instead of a single counter for every bits register, we have a timestamp
* for every valid number of bits we can encounter. Every time we encounter
* a certain number of bits, we update the timestamp in those registers to
* the current timestamp.
*
* We can query the sketch's stored cardinality for the range of some timestamp
* up to now: For each register, we return the highest bits bucket that has a
* modified timestamp >= the query timestamp. This value is the number of bits
* for this register in the normal HLL calculation.
*
* The memory usage is 2^B * (C + 1) * sizeof(TimetampTz), or 184kiB.
* Usage could be halved if we decide to reduce the required time dimension
* precision; as 32 bits in second precision should be enough for statistics.
* However, that is not yet implemented.
*/
typedef struct HyperLogLogState
{
TimestampTz regs[HLL_N_REGISTERS][HLL_C_BITS + 1];
} HyperLogLogState;
extern void initSHLL(HyperLogLogState *cState);
extern void addSHLL(HyperLogLogState *cState, uint32 hash);
extern double estimateSHLL(HyperLogLogState *cState, time_t dutration);
#endif

View File

@@ -1,9 +0,0 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.4'" to load this file. \quit
CREATE FUNCTION approximate_working_set_size_seconds(duration integer default null)
RETURNS integer
AS 'MODULE_PATHNAME', 'approximate_working_set_size_seconds'
LANGUAGE C PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION approximate_working_set_size_seconds(integer) TO pg_monitor;

View File

@@ -1 +0,0 @@
DROP FUNCTION IF EXISTS approximate_working_set_size_seconds(integer) CASCADE;

View File

@@ -63,6 +63,8 @@ char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
int max_vacuum_defer_cleanup_age = 0;
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
static WalproposerShmemState *walprop_shared;
@@ -218,6 +220,16 @@ nwp_register_gucs(void)
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable(
"neon.max_vacuum_defer_cleanup_age",
"Restrict oldest xmin pinned by hot standby feedback to prevent bloating of master",
NULL,
&max_vacuum_defer_cleanup_age,
0, 0, INT_MAX,
PGC_SIGHUP,
0,
NULL, NULL, NULL);
}
/*
@@ -1855,6 +1867,7 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
FullTransactionId xmin = hsFeedback.xmin;
FullTransactionId catalog_xmin = hsFeedback.catalog_xmin;
FullTransactionId next_xid = ReadNextFullTransactionId();
/*
* Page server is updating nextXid in checkpoint each 1024 transactions,
* so feedback xmin can be actually larger then nextXid and
@@ -1863,8 +1876,14 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
*/
if (FullTransactionIdPrecedes(next_xid, xmin))
xmin = next_xid;
else if (max_vacuum_defer_cleanup_age != 0 && xmin.value < next_xid.value - max_vacuum_defer_cleanup_age)
xmin.value = next_xid.value - max_vacuum_defer_cleanup_age;
if (FullTransactionIdPrecedes(next_xid, catalog_xmin))
catalog_xmin = next_xid;
else if (max_vacuum_defer_cleanup_age != 0 && catalog_xmin.value < next_xid.value - max_vacuum_defer_cleanup_age)
catalog_xmin.value = next_xid.value - max_vacuum_defer_cleanup_age;
agg_hs_feedback = hsFeedback;
elog(DEBUG2, "ProcessStandbyHSFeedback(xmin=%d, catalog_xmin=%d", XidFromFullTransactionId(hsFeedback.xmin), XidFromFullTransactionId(hsFeedback.catalog_xmin));
ProcessStandbyHSFeedback(hsFeedback.ts,

View File

@@ -7,7 +7,7 @@ OBJS = \
neontest.o
EXTENSION = neon_test_utils
DATA = neon_test_utils--1.3.sql
DATA = neon_test_utils--1.2.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CONFIG = pg_config

View File

@@ -45,21 +45,3 @@ CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL)
RETURNS VOID
AS 'MODULE_PATHNAME', 'neon_xlogflush'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION trigger_panic()
RETURNS VOID
AS 'MODULE_PATHNAME', 'trigger_panic'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION trigger_segfault()
RETURNS VOID
AS 'MODULE_PATHNAME', 'trigger_segfault'
LANGUAGE C PARALLEL UNSAFE;
-- Alias for `trigger_segfault`, just because `SELECT 💣()` looks fun
CREATE OR REPLACE FUNCTION 💣() RETURNS void
LANGUAGE plpgsql AS $$
BEGIN
PERFORM trigger_segfault();
END;
$$;

View File

@@ -1,6 +1,6 @@
# neon_test_utils extension
comment = 'helpers for neon testing and debugging'
default_version = '1.3'
default_version = '1.2'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -42,8 +42,6 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
PG_FUNCTION_INFO_V1(neon_xlogflush);
PG_FUNCTION_INFO_V1(trigger_panic);
PG_FUNCTION_INFO_V1(trigger_segfault);
/*
* Linkage to functions in neon module.
@@ -491,24 +489,3 @@ neon_xlogflush(PG_FUNCTION_ARGS)
XLogFlush(lsn);
PG_RETURN_VOID();
}
/*
* Function to trigger panic.
*/
Datum
trigger_panic(PG_FUNCTION_ARGS)
{
elog(PANIC, "neon_test_utils: panic");
PG_RETURN_VOID();
}
/*
* Function to trigger a segfault.
*/
Datum
trigger_segfault(PG_FUNCTION_ARGS)
{
int *ptr = NULL;
*ptr = 42;
PG_RETURN_VOID();
}

8
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -734,13 +734,13 @@ typing-extensions = ">=4.1.0"
[[package]]
name = "certifi"
version = "2024.7.4"
version = "2023.7.22"
description = "Python package for providing Mozilla's CA Bundle."
optional = false
python-versions = ">=3.6"
files = [
{file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"},
{file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"},
{file = "certifi-2023.7.22-py3-none-any.whl", hash = "sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9"},
{file = "certifi-2023.7.22.tar.gz", hash = "sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082"},
]
[[package]]

View File

@@ -445,19 +445,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("WAL service main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
let timeline_housekeeping_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(async move {
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
loop {
tokio::time::sleep(TOMBSTONE_TTL).await;
GlobalTimelines::housekeeping(&TOMBSTONE_TTL);
}
})
.map(|res| ("Timeline map housekeeping".to_owned(), res));
tasks_handles.push(Box::pin(timeline_housekeeping_handle));
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let conf_ = conf.clone();
let wal_service_handle = current_thread_rt

View File

@@ -15,19 +15,12 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
@@ -71,17 +64,11 @@ impl GlobalTimelinesState {
.cloned()
.ok_or(TimelineError::NotFound(*ttid))
}
fn delete(&mut self, ttid: TenantTimelineId) {
self.timelines.remove(&ttid);
self.tombstones.insert(ttid, Instant::now());
}
}
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
@@ -211,17 +198,11 @@ impl GlobalTimelines {
let tli = Arc::new(timeline);
// TODO: prevent concurrent timeline creation/loading
{
let mut state = TIMELINES_STATE.lock().unwrap();
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
if state.tombstones.remove(&ttid).is_some() {
warn!("Un-deleted timeline {ttid}");
}
state.timelines.insert(ttid, tli.clone());
}
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
@@ -248,7 +229,7 @@ impl GlobalTimelines {
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub(crate) async fn create(
pub async fn create(
ttid: TenantTimelineId,
server_info: ServerInfo,
commit_lsn: Lsn,
@@ -260,11 +241,6 @@ impl GlobalTimelines {
// Timeline already exists, return it.
return Ok(timeline);
}
if state.tombstones.contains_key(&ttid) {
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
}
state.get_dependencies()
};
@@ -324,19 +300,17 @@ impl GlobalTimelines {
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
/// i.e. loaded in memory and not cancelled.
pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
let tli_res = {
let state = TIMELINES_STATE.lock().unwrap();
state.get(&ttid)
};
match tli_res {
pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
match res {
Ok(tli) => {
if tli.is_cancelled() {
return Err(TimelineError::Cancelled(ttid));
}
Ok(tli)
}
_ => tli_res,
_ => res,
}
}
@@ -365,26 +339,12 @@ impl GlobalTimelines {
/// Cancels timeline, then deletes the corresponding data directory.
/// If only_local, doesn't remove WAL segments in remote storage.
pub(crate) async fn delete(
pub async fn delete(
ttid: &TenantTimelineId,
only_local: bool,
) -> Result<TimelineDeleteForceResult> {
let tli_res = {
let state = TIMELINES_STATE.lock().unwrap();
if state.tombstones.contains_key(ttid) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
return Ok(TimelineDeleteForceResult {
dir_existed: false,
was_active: false,
});
}
state.get(ttid)
};
let result = match tli_res {
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
match tli_res {
Ok(timeline) => {
let was_active = timeline.broker_active.load(Ordering::Relaxed);
@@ -394,6 +354,11 @@ impl GlobalTimelines {
info!("deleting timeline {}, only_local={}", ttid, only_local);
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
// Remove timeline from the map.
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
// https://github.com/neondatabase/neon/issues/3146
// TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
Ok(TimelineDeleteForceResult {
dir_existed,
was_active, // TODO: we probably should remove this field
@@ -409,14 +374,7 @@ impl GlobalTimelines {
was_active: false,
})
}
};
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
// are used to prevent still-running computes from re-creating the same timeline when they send data,
// and to speed up repeated deletion calls by avoiding re-listing objects.
TIMELINES_STATE.lock().unwrap().delete(*ttid);
result
}
}
/// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
@@ -462,20 +420,19 @@ impl GlobalTimelines {
tenant_id,
))?;
// FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
// let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
// if !tlis_after_delete.is_empty() {
// // Some timelines were created while we were deleting them, returning error
// // to the caller, so it can retry later.
// bail!(
// "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
// tenant_id
// );
// }
Ok(deleted)
}
pub fn housekeeping(tombstone_ttl: &Duration) {
let mut state = TIMELINES_STATE.lock().unwrap();
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
// may recreate a deleted timeline.
let now = Instant::now();
state
.tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
}
}
#[derive(Clone, Copy, Serialize)]

View File

@@ -1,4 +1,4 @@
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use pageserver::tenant::storage_layer::LayerName;
use serde::{Deserialize, Serialize};
@@ -29,7 +29,7 @@ impl LargeObjectKind {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize)]
pub struct LargeObject {
pub key: String,
pub size: u64,
@@ -45,76 +45,53 @@ pub async fn find_large_objects(
bucket_config: BucketConfig,
min_size: u64,
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);
let s3_client = s3_client.clone();
async move {
let mut objects = Vec::new();
let mut total_objects_ctr = 0u64;
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
total_objects_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok((tenant_shard_id, objects, total_objects_ctr))
}
});
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let mut objects = Vec::new();
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(res) = objects_stream.next().await {
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
objects.extend_from_slice(&objects_slice);
while let Some(tenant_shard_id) = tenants.next().await {
let tenant_shard_id = tenant_shard_id?;
let mut tenant_root = target.tenant_root(&tenant_shard_id);
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
object_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
object_ctr += total_objects_ctr;
tenant_ctr += 1;
if tenant_ctr % 100 == 0 {
if tenant_ctr % 50 == 0 {
tracing::info!(
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
objects.len()
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
);
}
}
let bucket_name = target.bucket_name();
tracing::info!(
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
objects.len()
);
Ok(LargeObjectListing { objects })
}

View File

@@ -78,8 +78,6 @@ enum Command {
min_size: u64,
#[arg(short, long, default_value_t = false)]
ignore_deltas: bool,
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
concurrency: usize,
},
}
@@ -212,15 +210,10 @@ async fn main() -> anyhow::Result<()> {
Command::FindLargeObjects {
min_size,
ignore_deltas,
concurrency,
} => {
let summary = find_large_objects::find_large_objects(
bucket_config,
min_size,
ignore_deltas,
concurrency,
)
.await?;
let summary =
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}

View File

@@ -943,8 +943,6 @@ class NeonEnvBuilder:
# if the test threw an exception, don't check for errors
# as a failing assertion would cause the cleanup below to fail
ps_assert_metric_no_errors=(exc_type is None),
# do not fail on endpoint errors to allow the rest of cleanup to proceed
fail_on_endpoint_errors=False,
)
cleanup_error = None
@@ -1216,11 +1214,11 @@ class NeonEnv:
for f in futs:
f.result()
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
def stop(self, immediate=False, ps_assert_metric_no_errors=False):
"""
After this method returns, there should be no child processes running.
"""
self.endpoints.stop_all(fail_on_endpoint_errors)
self.endpoints.stop_all()
# Stop storage controller before pageservers: we don't want it to spuriously
# detect a pageserver "failure" during test teardown
@@ -3901,17 +3899,9 @@ class EndpointFactory:
pageserver_id=pageserver_id,
)
def stop_all(self, fail_on_error=True) -> "EndpointFactory":
exception = None
def stop_all(self) -> "EndpointFactory":
for ep in self.endpoints:
try:
ep.stop()
except Exception as e:
log.error(f"Failed to stop endpoint {ep.endpoint_id}: {e}")
exception = e
if fail_on_error and exception is not None:
raise exception
ep.stop()
return self

View File

@@ -1,23 +0,0 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.parametrize(
"sql_func",
[
"trigger_panic",
"trigger_segfault",
"💣", # calls `trigger_segfault` internally
],
)
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
"""
Test that triggering crash from neon_test_utils crashes the endpoint
"""
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_endpoint_crash")
endpoint = env.endpoints.create_start("test_endpoint_crash")
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
endpoint.safe_psql(f"SELECT {sql_func}();")

View File

@@ -1,4 +1,3 @@
import time
from pathlib import Path
from fixtures.log_helper import log
@@ -73,46 +72,3 @@ WITH (fillfactor='100');
blocks = query_scalar(cur, "select approximate_working_set_size(true)")
log.info(f"working set size after some index access of a few select pages only {blocks}")
assert blocks < 10
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.4'")
cur.execute(
"create table t(pk integer primary key, count integer default 0, payload text default repeat('?', 128))"
)
cur.execute("insert into t (pk) values (generate_series(1,1000000))")
time.sleep(2)
before_10k = time.monotonic()
cur.execute("select sum(count) from t where pk between 10000 and 20000")
time.sleep(2)
before_1k = time.monotonic()
cur.execute("select sum(count) from t where pk between 1000 and 2000")
after = time.monotonic()
cur.execute(f"select approximate_working_set_size_seconds({int(after - before_1k + 1)})")
estimation_1k = cur.fetchall()[0][0]
log.info(f"Working set size for selecting 1k records {estimation_1k}")
cur.execute(f"select approximate_working_set_size_seconds({int(after - before_10k + 1)})")
estimation_10k = cur.fetchall()[0][0]
log.info(f"Working set size for selecting 10k records {estimation_10k}")
cur.execute("select pg_table_size('t')")
size = cur.fetchall()[0][0] // 8192
log.info(f"Table size {size} blocks")
assert estimation_1k >= 20 and estimation_1k <= 40
assert estimation_10k >= 200 and estimation_10k <= 400

View File

@@ -50,7 +50,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.3",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.4", "1.3", "1.2", "1.1", "1.0"]
all_versions = ["1.3", "1.2", "1.1", "1.0"]
current_version = "1.3"
for idx, begin_version in enumerate(all_versions):
for target_version in all_versions[idx + 1 :]:

View File

@@ -16,8 +16,6 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
from fixtures.utils import wait_until
from fixtures.workload import Workload
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
# A tenant configuration that is convenient for generating uploads and deletions
# without a large amount of postgres traffic.
@@ -61,7 +59,7 @@ def evict_random_layers(
@pytest.mark.parametrize("seed", [1, 2, 3])
def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver, seed: int):
def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
"""
Issue many location configuration changes, ensure that tenants
remain readable & we don't get any unexpected errors. We should
@@ -75,20 +73,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=s3_storage(),
)
neon_env_builder.control_plane_compute_hook_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/notify-attach"
)
def ignore_notify(request: Request):
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.
log.info(f"Ignoring storage controller compute notification: {request.json}")
return Response(status=200)
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
ignore_notify
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
pageservers = env.pageservers
@@ -115,15 +99,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
workload.init(env.pageservers[0].id)
workload.write_rows(256, env.pageservers[0].id)
# Discourage the storage controller from interfering with the changes we will make directly on the pageserver
env.storage_controller.tenant_policy_update(
tenant_id,
{
"scheduling": "Stop",
},
)
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy Stop.*")
# We use a fixed seed to make the test reproducible: we want a randomly
# chosen order, but not to change the order every time we run the test.
rng = random.Random(seed)

View File

@@ -54,4 +54,4 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
pcur.execute(f"INSERT into t values ({n_records}, 0)")
n_records += 1
with sub.cursor() as scur:
wait_until(60, 0.5, check_that_changes_propagated)
wait_until(10, 0.5, check_that_changes_propagated)

View File

@@ -67,9 +67,8 @@ def test_tenant_delete_smoke(
# first try to delete non existing tenant
tenant_id = TenantId.generate()
env.pageserver.allowed_errors.extend(
[".*NotFound.*", ".*simulated failure.*", ".*failed to delete .+ objects.*"]
)
env.pageserver.allowed_errors.append(".*NotFound.*")
env.pageserver.allowed_errors.append(".*simulated failure.*")
# Check that deleting a non-existent tenant gives the expected result: this is a loop because we
# may need to retry on some remote storage errors injected by the test harness