Compare commits

..

57 Commits

Author SHA1 Message Date
Arpad Müller
5e32cce2d9 Add script for plots 2024-06-22 18:14:05 +02:00
Arpad Müller
6b67135bd3 Fix offset stream issue 2024-06-18 17:50:47 +02:00
Arpad Müller
66d3bef947 More printing and assertions 2024-06-18 17:50:47 +02:00
Arpad Müller
e749e73ad6 Add the compression algo name 2024-06-18 17:50:47 +02:00
Arpad Müller
c5034f0e45 Fix build 2024-06-18 17:50:47 +02:00
Arpad Müller
d2533c06a5 Always delete the file, even on error 2024-06-18 17:50:47 +02:00
Arpad Müller
0fdeca882c Fix tests 2024-06-18 17:50:47 +02:00
Arpad Müller
f1bebda713 Fix failing test 2024-06-18 17:50:47 +02:00
Arpad Müller
983972f812 Add tests for compression 2024-06-18 17:50:47 +02:00
Arpad Müller
2ea8d1b151 Shutdown instead of flush 2024-06-18 17:50:47 +02:00
Arpad Müller
2f70221503 Don't forget the flush 2024-06-18 17:50:47 +02:00
Arpad Müller
07bd0ce69e Also measure decompression time 2024-06-18 17:50:47 +02:00
Arpad Müller
40e79712eb Add decompression 2024-06-18 17:50:47 +02:00
Arpad Müller
88b24e1593 Move constants out into file 2024-06-18 17:50:47 +02:00
Arpad Müller
8fcdc22283 Add stats info 2024-06-18 17:50:47 +02:00
Arpad Müller
2eb8b428cc Also support the generation-less legacy naming scheme 2024-06-18 17:50:47 +02:00
Arpad Müller
e6a0e7ec61 Add zstd with low compression quality 2024-06-18 17:50:47 +02:00
Arpad Müller
843d996cb1 More precise printing 2024-06-18 17:50:47 +02:00
Arpad Müller
c824ffe1dc Add ZstdHigh compression mode 2024-06-18 17:50:47 +02:00
Arpad Müller
dadbd87ac1 Add percent to output 2024-06-18 17:50:47 +02:00
Arpad Müller
0e667dcd93 more yielding 2024-06-18 17:50:47 +02:00
Arpad Müller
14447b98ce Yield in between 2024-06-18 17:50:47 +02:00
Arpad Müller
8fcb236783 Increase listing limit 2024-06-18 17:50:47 +02:00
Arpad Müller
a9963db8c3 Create timeline dir in temp location if not existent 2024-06-18 17:50:47 +02:00
Arpad Müller
0c500450fe Print error better 2024-06-18 17:50:47 +02:00
Arpad Müller
3182c3361a Corrections 2024-06-18 17:50:47 +02:00
Arpad Müller
80803ff098 Printing tweaks 2024-06-18 17:50:47 +02:00
Arpad Müller
9b74d554b4 Remove generation suffix 2024-06-18 17:50:47 +02:00
Arpad Müller
fce252fb2c Rename dest_path to tmp_dir 2024-06-18 17:50:47 +02:00
Arpad Müller
f132658bd9 Some prints 2024-06-18 17:50:47 +02:00
Arpad Müller
d030cbffec Print number of keys 2024-06-18 17:50:47 +02:00
Arpad Müller
554a6bd4a6 Two separate commands
More easy to have an overview
2024-06-18 17:50:47 +02:00
Arpad Müller
9850794250 Remote layer file after done 2024-06-18 17:50:47 +02:00
Arpad Müller
f5baac2579 clippy 2024-06-18 17:50:47 +02:00
Arpad Müller
2d37db234a Add mode to compare multiple files from a tenant 2024-06-18 17:50:47 +02:00
Arpad Müller
8745c0d6f2 Add a pagectl tool to recompress image layers 2024-06-18 17:50:47 +02:00
Christian Schwarz
6c6a7f9ace [v2] Include openssl and ICU statically linked (#8074)
We had to revert the earlier static linking change due to libicu version
incompatibilities:

- original PR: https://github.com/neondatabase/neon/pull/7956
- revert PR: https://github.com/neondatabase/neon/pull/8003

Specifically, the problem manifests for existing projects as error

```
DETAIL:  The collation in the database was created using version 153.120.42, but the operating system provides version 153.14.37.
```

So, this PR reintroduces the original change but with the exact same
libicu version as in Debian `bullseye`, i.e., the libicu version that
we're using today.
This avoids the version incompatibility.


Additional changes made by Christian
====================================
- `hashFiles` can take multiple arguments, use that feature
- validation of the libicu tarball checksum
- parallel build (`-j $(nproc)`) for openssl and libicu

Follow-ups
==========

Debian bullseye has a few patches on top of libicu:
https://sources.debian.org/patches/icu/67.1-7/
We still decide whether we need to include these patches or not.
=> https://github.com/neondatabase/cloud/issues/14527

Eventually, we'll have to figure out an upgrade story for libicu.
That work is tracked in epic
https://github.com/neondatabase/cloud/issues/14525.

The OpenSSL version in this PR is arbitrary.
We should use `1.1.1w` + Debian patches if applicable.
See https://github.com/neondatabase/cloud/issues/14526.

Longer-term:
* https://github.com/neondatabase/cloud/issues/14519
* https://github.com/neondatabase/cloud/issues/14525

Refs
====

Co-authored-by: Christian Schwarz <christian@neon.tech>

refs https://github.com/neondatabase/cloud/issues/12648

---------

Co-authored-by: Rahul Patil <rahul@neon.tech>
2024-06-18 09:42:22 +02:00
MMeent
e729f28205 Fix log rates (#8035)
## Summary of changes

- Stop logging HealthCheck message passing at INFO level (moved to
  DEBUG)
- Stop logging /status accesses at INFO (moved to DEBUG)
- Stop logging most occurances of
  `missing config file "compute_ctl_temp_override.conf"`
- Log memory usage only when the data has changed significantly, or if
  we've not recently logged the data, rather than always every 2 seconds.
2024-06-17 18:57:49 +00:00
Alexander Bayandin
b6e1c09c73 CI(check-build-tools-image): change build-tools image persistent tag (#8059)
## Problem

We don't rebuild `build-tools` image for changes in a workflow that
builds this image itself
(`.github/workflows/build-build-tools-image.yml`) or in a workflow that
determines which tag to use
(`.github/workflows/check-build-tools-image.yml`)

## Summary of changes
- Use a hash of `Dockerfile.build-tools` and workflow files as a
persistent tag instead of using a commit sha.
2024-06-17 12:47:20 +01:00
Vlad Lazar
16d80128ee storcon: handle entire cluster going unavailable correctly (#8060)
## Problem
A period of unavailability for all pageservers in a cluster produced the
following fallout in staging:
all tenants became detached and required manual operation to re-attach.
Manually restarting
the storage controller re-attached all tenants due to a consistency bug.

Turns out there are two related bugs which caused the issue:
1. Pageserver re-attach can be processed before the first heartbeat.
Hence, when handling
the availability delta produced by the heartbeater,
`Node::get_availability_transition` claims
that there's no need to reconfigure the node.
2. We would still attempt to reschedule tenant shards when handling
offline transitions even
if the entire cluster is down. This puts tenant shards into a state
where the reconciler believes
they have to be detached (no pageserver shows up in their intent state).
This is doubly wrong
because we don't mark the tenant shards as detached in the database,
thus causing memory vs
database consistency issues. Luckily, this bug allowed all tenant shards
to re-attach after restart.

## Summary of changes
* For (1), abuse the fact that re-attach requests do not contain an
utilisation score and use that
to differentiate from a node that replied to heartbeats.
* For (2), introduce a special case that skips any rescheduling if the
entire cluster is unavailable.
* Update the storage controller heartbeat test with an extra scenario
where the entire cluster goes
for lunch.

Fixes https://github.com/neondatabase/neon/issues/8044
2024-06-17 11:40:35 +01:00
Arseny Sher
2ba414525e Install rust binaries before running rust tests.
cargo test (or nextest) might rebuild the binaries with different
features/flags, so do install immediately after the build. Triggered by the
particular case of nextest invocations missing $CARGO_FEATURES, which recompiled
safekeeper without 'testing' feature which made python tests needing
it (failpoints) not run in the CI.

Also add CARGO_FEATURES to the nextest runs anyway because there doesn't seem to
be an important reason not to.
2024-06-17 06:23:32 +03:00
Peter Bendel
46210035c5 add halfvec indexing and queries to periodic pgvector performance tests (#8057)
## Problem

halfvec data type was introduced in pgvector 0.7.0 and is popular
because
it allows smaller vectors, smaller indexes and potentially better
performance.

So far we have not tested halfvec in our periodic performance tests.
This PR adds halfvec indexing and halfvec queries to the test.
2024-06-14 18:36:50 +02:00
Alex Chi Z
81892199f6 chore(pageserver): vectored get target_keyspace directly accums (#8055)
follow up on https://github.com/neondatabase/neon/pull/7904

avoid a layer of indirection introduced by `Vec<Range<Key>>`

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-06-14 11:57:58 -04:00
Alexander Bayandin
83eb02b07a CI: downgrade docker/setup-buildx-action (#8062)
## Problem

I've bumped `docker/setup-buildx-action` in #8042 because I wasn't able
to reproduce the issue from #7445.
But now the issue appears again in
https://github.com/neondatabase/neon/actions/runs/9514373620/job/26226626923?pr=8059
The steps to reproduce aren't clear, it required
`docker/setup-buildx-action@v3` and rebuilding the image without cache,
probably

## Summary of changes
- Downgrade `docker/setup-buildx-action@v3` 
to `docker/setup-buildx-action@v2`
2024-06-14 11:43:51 +00:00
Arseny Sher
a71f58e69c Fix test_segment_init_failure.
Graceful shutdown broke it.
2024-06-14 14:24:15 +03:00
Conrad Ludgate
e6eb0020a1 update rust to 1.79.0 (#8048)
## Problem

rust 1.79 new enabled by default lints

## Summary of changes

* update to rust 1.79
* `s/default_features/default-features/`
* fix proxy dead code.
* fix pageserver dead code.
2024-06-14 13:23:52 +02:00
John Spray
eb0ca9b648 pageserver: improved synthetic size & find_gc_cutoff error handling (#8051)
## Problem

This PR refactors some error handling to avoid log spam on
tenant/timeline shutdown.

- "ignoring failure to find gc cutoffs: timeline shutting down." logs
(https://github.com/neondatabase/neon/issues/8012)
- "synthetic_size_worker: failed to calculate synthetic size for tenant
...: Failed to refresh gc_info before gathering inputs: tenant shutting
down", for example here:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8049/9502988669/index.html#suites/3fc871d9ee8127d8501d607e03205abb/1a074a66548bbcea

Closes: https://github.com/neondatabase/neon/issues/8012

## Summary of changes

- Refactor: Add a PageReconstructError variant to GcError: this is the
only kind of error that find_gc_cutoffs can emit.
- Functional change: only ignore shutdown PageReconstructError variant:
for other variants, treat it as a real error
- Refactor: add a structured CalculateSyntheticSizeError type and use it
instead of anyhow::Error in synthetic size calculations
- Functional change: while iterating through timelines gathering logical
sizes, only drop out if the whole tenant is cancelled: individual
timeline cancellations indicate deletion in progress and we can just
ignore those.
2024-06-14 11:08:11 +01:00
John Spray
6843fd8f89 storage controller: always wait for tenant detach before delete (#8049)
## Problem

This test could fail with a timeout waiting for tenant deletions.

Tenant deletions could get tripped up on nodes transitioning from
offline to online at the moment of the deletion. In a previous
reconciliation, the reconciler would skip detaching a particular
location because the node was offline, but then when we do the delete
the node is marked as online and can be picked as the node to use for
issuing a deletion request. This hits the "Unexpectedly still attached
path", which would still work if the caller kept calling DELETE, but if
a caller does a Delete,get,get,get poll, then it doesn't work because
the GET calls fail after we've marked the tenant as detached.

## Summary of changes

Fix the undesirable storage controller behavior highlighted by this test
failure:
- Change tenant deletion flow to _always_ wait for reconciliation to
succeed: it was unsound to proceed and return 202 if something was still
attached, because after the 202 callers can no longer GET the tenant.

Stabilize the test:
- Add a reconcile_until_idle to the test, so that it will not have
reconciliations running in the background while we mark a node online.
This test is not meant to be a chaos test: we should test that kind of
complexity elsewhere.
- This reconcile_until_idle also fixes another failure mode where the
test might see a None for a tenant location because a reconcile was
mutating it
(https://neon-github-public-dev.s3.amazonaws.com/reports/pr-7288/9500177581/index.html#suites/8fc5d1648d2225380766afde7c428d81/4acece42ae00c442/)

It remains the case that a motivated tester could produce a situation
where a DELETE gives a 500, when precisely the wrong node transitions
from offline to available at the precise moment of a deletion (but the
500 is better than returning 202 and then failing all subsequent GETs).
Note that nodes don't go through the offline state during normal
restarts, so this is super rare. We should eventually fix this by making
DELETE to the pageserver implicitly detach the tenant if it's attached,
but that should wait until nobody is using the legacy-style deletes (the
ones that use 202 + polling)
2024-06-14 10:37:30 +01:00
Alexander Bayandin
edc900028e CI: Update outdated GitHub Actions (#8042)
## Problem
We have some amount of outdated action in the CI pipeline, GitHub
complains about some of them.

## Summary of changes
- Update `actions/checkout@1` (a really old one) in
`vm-compute-node-image`
- Update `actions/checkout@3` in `build-build-tools-image`
- Update `docker/setup-buildx-action` in all workflows / jobs, it was
downgraded in https://github.com/neondatabase/neon/pull/7445, but it
it seems it works fine now
2024-06-14 10:24:13 +01:00
Heikki Linnakangas
789196572e Fix test_replica_query_race flakiness (#8038)
This failed once with `relation "test" does not exist` when trying to
run the query on the standby. It's possible that the standby is started
before the CREATE TABLE is processed in the pageserver, and the standby
opens up for queries before it has received the CREATE TABLE transaction
from the primary. To fix, wait for the standby to catch up to the
primary before starting to run the queries.


https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8025/9483658488/index.html
2024-06-14 11:51:12 +03:00
John Spray
425eed24e8 pageserver: refine shutdown handling in secondary download (#8052)
## Problem

Some code paths during secondary mode download are returning Ok() rather
than UpdateError::Cancelled. This is functionally okay, but it means
that the end of TenantDownloader::download has a sanity check that the
progress is 100% on success, and prints a "Correcting drift..." warning
if not. This warning can be emitted in a test, e.g.
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8049/9503642976/index.html#/testresult/fff1624ba6adae9e.

## Summary of changes

- In secondary download cancellation paths, use
Err(UpdateError::Cancelled) rather than Ok(), so that we drop out of the
download function and do not reach the progress sanity check.
2024-06-14 09:39:31 +01:00
James Broadhead
f67010109f extensions: pgvector-0.7.2 (#8037)
Update pgvector to 0.7.2

Purely mechanical update to pgvector.patch, just as a place to start
from
2024-06-14 10:17:43 +02:00
Tristan Partin
0c3e3a8667 Set application_name for internal connections to computes
This will help when analyzing the origins of connections to a compute
like in [0].

[0]: https://github.com/neondatabase/cloud/issues/14247
2024-06-13 12:06:10 -07:00
Christian Schwarz
82719542c6 fix: vectored get returns incorrect result on inexact materialized page cache hit (#8050)
# Problem

Suppose our vectored get starts with an inexact materialized page cache
hit ("cached lsn") that is shadowed by a newer image layer image layer.
Like so:


```
    <inmemory layers>

    +-+ < delta layer
    | |
   -|-|----- < image layer
    | |
    | |
   -|-|----- < cached lsn for requested key
    +_+
```

The correct visitation order is
1. inmemory layers
2. delta layer records in LSN range `[image_layer.lsn,
oldest_inmemory_layer.lsn_range.start)`
3. image layer

However, the vectored get code, when it visits the delta layer, it
(incorrectly!) returns with state `Complete`.

The reason why it returns is that it calls `on_lsn_advanced` with
`self.lsn_range.start`, i.e., the layer's LSN range.

Instead, it should use `lsn_range.start`, i.e., the LSN range from the
correct visitation order listed above.

# Solution

Use `lsn_range.start` instead of `self.lsn_range.start`.

# Refs

discovered by & fixes https://github.com/neondatabase/neon/issues/6967

Co-authored-by: Vlad Lazar <vlad@neon.tech>
2024-06-13 18:20:47 +00:00
Alex Chi Z
d25f7e3dd5 test(pageserver): add test wal record for unit testing (#8015)
https://github.com/neondatabase/neon/issues/8002

We need mock WAL record to make it easier to write unit tests. This pull
request adds such a record. It has `clear` flag and `append` field. The
tests for legacy-enhanced compaction are not modified yet and will be
part of the next pull request.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-06-13 09:44:37 -04:00
Anna Khanova
fbccd1e676 Proxy process updated errors (#8026)
## Problem

Respect errors classification from cplane
2024-06-13 14:42:26 +02:00
Heikki Linnakangas
dc2ab4407f Fix on-demand SLRU download on standby starting at WAL segment boundary (#8031)
If a standby is started right after switching to a new WAL segment, the
request in the SLRU download request would point to the beginning of the
segment (e.g. 0/5000000), while the not-modified-since LSN would point
to just after the page header (e.g. 0/5000028). It's effectively the
same position, as there cannot be any WAL records in between, but the
pageserver rightly errors out on any request where the request LSN <
not-modified since LSN.

To fix, round down the not-modified since LSN to the beginning of the
page like the request LSN.

Fixes issue #8030
2024-06-13 00:31:31 +03:00
69 changed files with 1825 additions and 679 deletions

View File

@@ -99,7 +99,7 @@ jobs:
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -410,14 +410,14 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Benchmark pgvector hnsw queries
- name: Benchmark pgvector queries
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
test_selection: performance/test_perf_pgvector_queries.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
extra_params: -m remote_cluster --timeout 21600
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"

View File

@@ -30,7 +30,6 @@ jobs:
check-image:
uses: ./.github/workflows/check-build-tools-image.yml
# This job uses older version of GitHub Actions because it's run on gen2 runners, which don't support node 20 (for newer versions)
build-image:
needs: [ check-image ]
if: needs.check-image.outputs.found == 'false'
@@ -55,7 +54,7 @@ jobs:
exit 1
fi
- uses: actions/checkout@v3
- uses: actions/checkout@v4
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker

View File

@@ -299,21 +299,21 @@ jobs:
uses: actions/cache@v4
with:
path: pg_install/v14
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@v4
with:
path: pg_install/v15
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: actions/cache@v4
with:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
@@ -337,34 +337,8 @@ jobs:
run: |
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
- name: Run rust tests
env:
NEXTEST_RETRIES: 3
run: |
#nextest does not yet support running doctests
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
done
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
export REMOTE_STORAGE_S3_REGION=eu-central-1
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_s3)'
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
${cov_prefix} cargo nextest run $CARGO_FLAGS -E 'package(remote_storage)' -E 'test(test_real_azure)'
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
- name: Install rust binaries
run: |
# Install target binaries
@@ -405,6 +379,32 @@ jobs:
done
fi
- name: Run rust tests
env:
NEXTEST_RETRIES: 3
run: |
#nextest does not yet support running doctests
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
done
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
export REMOTE_STORAGE_S3_REGION=eu-central-1
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_s3)'
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
- name: Install postgres binaries
run: cp -a pg_install /tmp/neon/pg_install
@@ -858,7 +858,7 @@ jobs:
cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }},mode=max
tags: |
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
- name: Build neon extensions test image
if: matrix.version == 'v16'
uses: docker/build-push-action@v5
@@ -965,7 +965,7 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v1
uses: actions/checkout@v4
with:
fetch-depth: 0

View File

@@ -25,26 +25,17 @@ jobs:
found: ${{ steps.check-image.outputs.found }}
steps:
- uses: actions/checkout@v4
- name: Get build-tools image tag for the current commit
id: get-build-tools-tag
env:
# Usually, for COMMIT_SHA, we use `github.event.pull_request.head.sha || github.sha`, but here, even for PRs,
# we want to use `github.sha` i.e. point to a phantom merge commit to determine the image tag correctly.
COMMIT_SHA: ${{ github.sha }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
IMAGE_TAG: |
${{ hashFiles('Dockerfile.build-tools',
'.github/workflows/check-build-tools-image.yml',
'.github/workflows/build-build-tools-image.yml') }}
run: |
LAST_BUILD_TOOLS_SHA=$(
gh api \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
--method GET \
--field path=Dockerfile.build-tools \
--field sha=${COMMIT_SHA} \
--field per_page=1 \
--jq ".[0].sha" \
"/repos/${GITHUB_REPOSITORY}/commits"
)
echo "image-tag=${LAST_BUILD_TOOLS_SHA}" | tee -a $GITHUB_OUTPUT
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
- name: Check if such tag found in the registry
id: check-image

10
Cargo.lock generated
View File

@@ -2946,6 +2946,15 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -3612,6 +3621,7 @@ dependencies = [
"hyper 0.14.26",
"itertools",
"leaky-bucket",
"lz4_flex",
"md5",
"metrics",
"nix 0.27.1",

View File

@@ -110,6 +110,7 @@ jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
lz4_flex = "0.11"
md5 = "0.7.0"
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }
@@ -120,7 +121,7 @@ num_cpus = "1.15"
num-traits = "0.2.15"
once_cell = "1.13"
opentelemetry = "0.20.0"
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.12.0"
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
@@ -128,7 +129,7 @@ parquet_derive = "51.0.0"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.14"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
@@ -184,7 +185,7 @@ tower-service = "0.3.2"
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
twox-hash = { version = "1.6.3", default-features = false }
url = "2.2"
urlencoding = "2.1"

View File

@@ -69,8 +69,6 @@ RUN set -e \
&& apt install -y \
libreadline-dev \
libseccomp-dev \
libicu67 \
openssl \
ca-certificates \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -112,6 +112,45 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
&& make install \
&& rm -rf ../lcov.tar.gz
# Compile and install the static OpenSSL library
ENV OPENSSL_VERSION=3.2.2
ENV OPENSSL_PREFIX=/usr/local/openssl
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
echo "197149c18d9e9f292c43f0400acaba12e5f52cacfe050f3d199277ea738ec2e7 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
cd /tmp && \
tar xzvf /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
rm /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
cd /tmp/openssl-${OPENSSL_VERSION} && \
./config --prefix=${OPENSSL_PREFIX} -static --static no-shared -fPIC && \
make -j "$(nproc)" && \
make install && \
cd /tmp && \
rm -rf /tmp/openssl-${OPENSSL_VERSION}
# Use the same version of libicu as the compute nodes so that
# clusters created using inidb on pageserver can be used by computes.
#
# TODO: at this time, Dockerfile.compute-node uses the debian bullseye libicu
# package, which is 67.1. We're duplicating that knowledge here, and also, technically,
# Debian has a few patches on top of 67.1 that we're not adding here.
ENV ICU_VERSION=67.1
ENV ICU_PREFIX=/usr/local/icu
# Download and build static ICU
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
mkdir /tmp/icu && \
pushd /tmp/icu && \
tar -xzf /tmp/libicu-${ICU_VERSION}.tgz && \
pushd icu/source && \
./configure --prefix=${ICU_PREFIX} --enable-static --enable-shared=no CXXFLAGS="-fPIC" CFLAGS="-fPIC" && \
make -j "$(nproc)" && \
make install && \
popd && \
rm -rf icu && \
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
popd
# Switch to nonroot user
USER nonroot:nonroot
WORKDIR /home/nonroot
@@ -141,7 +180,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.78.0
ENV RUSTC_VERSION=1.79.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
@@ -170,3 +209,6 @@ RUN whoami \
&& rustup --version --verbose \
&& rustc --version --verbose \
&& clang --version
# Set following flag to check in Makefile if its running in Docker
RUN touch /home/nonroot/.docker_build

View File

@@ -246,8 +246,8 @@ COPY patches/pgvector.patch /pgvector.patch
# By default, pgvector Makefile uses `-march=native`. We don't want that,
# because we build the images on different machines than where we run them.
# Pass OPTFLAGS="" to remove it.
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.1.tar.gz -O pgvector.tar.gz && \
echo "fe6c8cb4e0cd1a8cb60f5badf9e1701e0fcabcfc260931c26d01e155c4dd21d1 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
patch -p1 < /pgvector.patch && \
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -979,7 +979,7 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|| exit 1; rm -f $f; done
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
# cmake is required for the h3 test
RUN apt-get update && apt-get install -y cmake
RUN patch -p1 < /ext-src/pg_hintplan.patch

View File

@@ -3,6 +3,9 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
OPENSSL_PREFIX_DIR := /usr/local/openssl
ICU_PREFIX_DIR := /usr/local/icu
#
# We differentiate between release / debug build types using the BUILD_TYPE
# environment variable.
@@ -20,6 +23,16 @@ else
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
endif
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
# Exclude static build openssl, icu for local build (MacOS, Linux)
# Only keep for build type release and debug
PG_CFLAGS += -I$(OPENSSL_PREFIX_DIR)/include
PG_CONFIGURE_OPTS += --with-icu
PG_CONFIGURE_OPTS += ICU_CFLAGS='-I/$(ICU_PREFIX_DIR)/include -DU_STATIC_IMPLEMENTATION'
PG_CONFIGURE_OPTS += ICU_LIBS='-L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -licui18n -licuuc -licudata -lstdc++ -Wl,-Bdynamic -lm'
PG_CONFIGURE_OPTS += LDFLAGS='-L$(OPENSSL_PREFIX_DIR)/lib -L$(OPENSSL_PREFIX_DIR)/lib64 -L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -Wl,-Bstatic -lssl -lcrypto -Wl,-Bdynamic -lrt -lm -ldl -lpthread'
endif
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
# Seccomp BPF is only available for Linux
@@ -28,7 +41,7 @@ else ifeq ($(UNAME_S),Darwin)
ifndef DISABLE_HOMEBREW
# macOS with brew-installed openssl requires explicit paths
# It can be configured with OPENSSL_PREFIX variable
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
OPENSSL_PREFIX := $(shell brew --prefix openssl@3)
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure

View File

@@ -735,7 +735,7 @@ fn cli() -> clap::Command {
Arg::new("filecache-connstr")
.long("filecache-connstr")
.default_value(
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable",
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
)
.value_name("FILECACHE_CONNSTR"),
)

View File

@@ -918,38 +918,39 @@ impl ComputeNode {
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:
// creating new extensions, roles, etc...
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
self.pg_reload_conf()?;
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
self.pg_reload_conf()?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
cleanup_instance(&mut client)?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(
&spec,
&mut client,
self.connstr.as_str(),
self.has_feature(ComputeFeature::AnonExtension),
)?;
handle_extensions(&spec, &mut client)?;
handle_extension_neon(&mut client)?;
// We can skip handle_migrations here because a new migration can only appear
// if we have a new version of the compute_ctl binary, which can only happen
// if compute got restarted, in which case we'll end up inside of apply_config
// instead of reconfigure.
}
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
cleanup_instance(&mut client)?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(
&spec,
&mut client,
self.connstr.as_str(),
self.has_feature(ComputeFeature::AnonExtension),
)?;
handle_extensions(&spec, &mut client)?;
handle_extension_neon(&mut client)?;
// We can skip handle_migrations here because a new migration can only appear
// if we have a new version of the compute_ctl binary, which can only happen
// if compute got restarted, in which case we'll end up inside of apply_config
// instead of reconfigure.
}
// 'Close' connection
drop(client);
// 'Close' connection
drop(client);
Ok(())
})?;
// reset max_cluster_size in config back to original value and reload config
config::compute_ctl_temp_override_remove(pgdata_path)?;
self.pg_reload_conf()?;
let unknown_op = "unknown".to_string();
@@ -1040,12 +1041,17 @@ impl ComputeNode {
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
// creating new extensions, roles, etc...
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
self.pg_reload_conf()?;
config::with_compute_ctl_tmp_override(
pgdata_path,
"neon.max_cluster_size=-1",
|| {
self.pg_reload_conf()?;
self.apply_config(&compute_state)?;
self.apply_config(&compute_state)?;
config::compute_ctl_temp_override_remove(pgdata_path)?;
Ok(())
},
)?;
self.pg_reload_conf()?;
}
self.post_apply_config()?;

View File

@@ -131,18 +131,17 @@ pub fn write_postgres_conf(
Ok(())
}
/// create file compute_ctl_temp_override.conf in pgdata_dir
/// add provided options to this file
pub fn compute_ctl_temp_override_create(pgdata_path: &Path, options: &str) -> Result<()> {
pub fn with_compute_ctl_tmp_override<F>(pgdata_path: &Path, options: &str, exec: F) -> Result<()>
where
F: FnOnce() -> Result<()>,
{
let path = pgdata_path.join("compute_ctl_temp_override.conf");
let mut file = File::create(path)?;
write!(file, "{}", options)?;
Ok(())
}
/// remove file compute_ctl_temp_override.conf in pgdata_dir
pub fn compute_ctl_temp_override_remove(pgdata_path: &Path) -> Result<()> {
let path = pgdata_path.join("compute_ctl_temp_override.conf");
std::fs::remove_file(path)?;
Ok(())
let res = exec();
file.set_len(0)?;
res
}

View File

@@ -17,7 +17,7 @@ use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use tokio::task;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
use utils::http::request::must_get_query_param;
@@ -48,7 +48,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
match (req.method(), req.uri().path()) {
// Serialized compute state.
(&Method::GET, "/status") => {
info!("serving /status GET request");
debug!("serving /status GET request");
let state = compute.state.lock().unwrap();
let status_response = status_response_from_state(&state);
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))

View File

@@ -4,10 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
testing = [ "pageserver_api/testing" ]
[dependencies]
anyhow.workspace = true
async-trait.workspace = true

View File

@@ -383,12 +383,6 @@ impl PageServerNode {
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: settings
.remove("test_vm_bit_debug_logging")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -512,12 +506,6 @@ impl PageServerNode {
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: settings
.remove("test_vm_bit_debug_logging")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
}
};

View File

@@ -4,10 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
testing = []
[dependencies]
serde.workspace = true
serde_with.workspace = true

View File

@@ -558,6 +558,12 @@ impl KeySpaceRandomAccum {
self.ranges.push(range);
}
pub fn add_keyspace(&mut self, keyspace: KeySpace) {
for range in keyspace.ranges {
self.add_range(range);
}
}
pub fn to_keyspace(mut self) -> KeySpace {
let mut ranges = Vec::new();
if !self.ranges.is_empty() {

View File

@@ -322,8 +322,6 @@ pub struct TenantConfig {
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub switch_aux_file_policy: Option<AuxFilePolicy>,
#[cfg(feature = "testing")]
pub test_vm_bit_debug_logging: Option<bool>,
}
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
@@ -457,6 +455,26 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
enum_map::Enum,
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
ZstdLow,
Zstd,
ZstdHigh,
LZ4,
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,

View File

@@ -7,7 +7,7 @@ license.workspace = true
[dependencies]
hyper.workspace = true
opentelemetry = { workspace = true, features=["rt-tokio"] }
opentelemetry-otlp = { workspace = true, default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { workspace = true, default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions.workspace = true
reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }

View File

@@ -25,6 +25,8 @@ pub struct Config {
///
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
memory_history_log_interval: usize,
/// The max number of iterations to skip before logging the next iteration
memory_history_log_noskip_interval: Duration,
}
impl Default for Config {
@@ -33,6 +35,7 @@ impl Default for Config {
memory_poll_interval: Duration::from_millis(100),
memory_history_len: 5, // use 500ms of history for decision-making
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
}
}
}
@@ -85,7 +88,12 @@ impl CgroupWatcher {
// buffer for samples that will be logged. once full, it remains so.
let history_log_len = self.config.memory_history_log_interval;
let max_skip = self.config.memory_history_log_noskip_interval;
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
let mut last_logged_memusage = MemoryStatus::zeroed();
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
let mut can_skip_logs_until = Instant::now() - max_skip;
for t in 0_u64.. {
ticker.tick().await;
@@ -115,12 +123,24 @@ impl CgroupWatcher {
// equal to the logging interval, we can just log the entire buffer every time we set
// the last entry, which also means that for this log line, we can ignore that it's a
// ring buffer (because all the entries are in order of increasing time).
if i == history_log_len - 1 {
//
// We skip logging the data if data hasn't meaningfully changed in a while, unless
// we've already ignored previous iterations for the last max_skip period.
if i == history_log_len - 1
&& (now > can_skip_logs_until
|| !history_log_buf
.iter()
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
{
info!(
history = ?MemoryStatus::debug_slice(&history_log_buf),
summary = ?summary,
"Recent cgroup memory statistics history"
);
can_skip_logs_until = now + max_skip;
last_logged_memusage = *history_log_buf.last().unwrap();
}
updates
@@ -232,6 +252,24 @@ impl MemoryStatus {
DS(slice)
}
/// Check if the other memory status is a close or similar result.
/// Returns true if the larger value is not larger than the smaller value
/// by 1/8 of the smaller value, and within 128MiB.
/// See tests::check_similarity_behaviour for examples of behaviour
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
let margin;
let diff;
if self.non_reclaimable >= other.non_reclaimable {
margin = other.non_reclaimable / 8;
diff = self.non_reclaimable - other.non_reclaimable;
} else {
margin = self.non_reclaimable / 8;
diff = other.non_reclaimable - self.non_reclaimable;
}
diff < margin && diff < 128 * 1024 * 1024
}
}
#[cfg(test)]
@@ -261,4 +299,65 @@ mod tests {
assert_eq!(values(2, 4), [9, 0, 1, 2]);
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
}
#[test]
fn check_similarity_behaviour() {
// This all accesses private methods, so we can't actually run this
// as doctests, because doctests run as an external crate.
let mut small = super::MemoryStatus {
non_reclaimable: 1024,
};
let mut large = super::MemoryStatus {
non_reclaimable: 1024 * 1024 * 1024 * 1024,
};
// objects are self-similar, no matter the size
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
// inequality is symmetric
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
small.non_reclaimable = 64;
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
// objects are self-similar, no matter the size
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
// values are similar if the larger value is larger by less than
// 12.5%, i.e. 1/8 of the smaller value.
// In the example above, large is exactly 12.5% larger, so this doesn't
// match.
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
large.non_reclaimable -= 1;
assert!(large.status_is_close_or_similar(&large));
assert!(small.status_is_close_or_similar(&large));
assert!(large.status_is_close_or_similar(&small));
// The 1/8 rule only applies up to 128MiB of difference
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
large.non_reclaimable = small.non_reclaimable / 8 * 9;
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
// the large value is put just above the threshold
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
assert!(large.status_is_close_or_similar(&large));
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
// now below
large.non_reclaimable -= 1;
assert!(large.status_is_close_or_similar(&large));
assert!(small.status_is_close_or_similar(&large));
assert!(large.status_is_close_or_similar(&small));
}
}

View File

@@ -12,11 +12,11 @@ use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use tracing::info;
use tracing::{debug, info};
use crate::protocol::{
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION,
PROTOCOL_MIN_VERSION,
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
};
/// The central handler for all communications in the monitor.
@@ -118,7 +118,12 @@ impl Dispatcher {
/// serialize the wrong thing and send it, since `self.sink.send` will take
/// any string.
pub async fn send(&mut self, message: OutboundMsg) -> anyhow::Result<()> {
info!(?message, "sending message");
if matches!(&message.inner, OutboundMsgKind::HealthCheck { .. }) {
debug!(?message, "sending message");
} else {
info!(?message, "sending message");
}
let json = serde_json::to_string(&message).context("failed to serialize message")?;
self.sink
.send(Message::Text(json))

View File

@@ -12,7 +12,7 @@ use axum::extract::ws::{Message, WebSocket};
use futures::StreamExt;
use tokio::sync::{broadcast, watch};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use crate::cgroup::{self, CgroupWatcher};
use crate::dispatcher::Dispatcher;
@@ -474,26 +474,29 @@ impl Runner {
// there is a message from the agent
msg = self.dispatcher.source.next() => {
if let Some(msg) = msg {
// Don't use 'message' as a key as the string also uses
// that for its key
info!(?msg, "received message");
match msg {
match &msg {
Ok(msg) => {
let message: InboundMsg = match msg {
Message::Text(text) => {
serde_json::from_str(&text).context("failed to deserialize text message")?
serde_json::from_str(text).context("failed to deserialize text message")?
}
other => {
warn!(
// Don't use 'message' as a key as the
// string also uses that for its key
msg = ?other,
"agent should only send text messages but received different type"
"problem processing incoming message: agent should only send text messages but received different type"
);
continue
},
};
if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
debug!(?msg, "received message");
} else {
info!(?msg, "received message");
}
let out = match self.process_message(message.clone()).await {
Ok(Some(out)) => out,
Ok(None) => continue,
@@ -517,7 +520,11 @@ impl Runner {
.await
.context("failed to send message")?;
}
Err(e) => warn!("{e}"),
Err(e) => warn!(
error = format!("{e}"),
msg = ?msg,
"received error message"
),
}
} else {
anyhow::bail!("dispatcher connection closed")

View File

@@ -8,7 +8,7 @@ license.workspace = true
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing"]
testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -1,4 +1,7 @@
use std::num::NonZeroU32;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
@@ -8,7 +11,7 @@ use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{delta_layer, image_layer, LayerName};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
@@ -20,7 +23,12 @@ use pageserver::{
},
virtual_file::VirtualFile,
};
use pageserver_api::models::ImageCompressionAlgorithm;
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig};
use std::fs;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -55,6 +63,17 @@ pub(crate) enum LayerCmd {
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
CompressOne {
dest_path: Utf8PathBuf,
layer_file_path: Utf8PathBuf,
},
CompressMany {
tmp_dir: Utf8PathBuf,
tenant_remote_prefix: String,
tenant_remote_config: String,
layers_dir: Utf8PathBuf,
parallelism: Option<u32>,
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
@@ -240,5 +259,138 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
LayerCmd::CompressOne {
dest_path,
layer_file_path,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let stats =
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
println!(
"Statistics: {stats:#?}\n{}",
serde_json::to_string(&stats).unwrap()
);
Ok(())
}
LayerCmd::CompressMany {
tmp_dir,
tenant_remote_prefix,
tenant_remote_config,
layers_dir,
parallelism,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let toml_document = toml_edit::Document::from_str(tenant_remote_config)?;
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
let storage = remote_storage::GenericRemoteStorage::from_config(&config)?;
let storage = Arc::new(storage);
let cancel = CancellationToken::new();
let path = RemotePath::from_string(tenant_remote_prefix)?;
let max_files = NonZeroU32::new(128_000);
let files_list = storage
.list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel)
.await?;
println!("Listing gave {} keys", files_list.keys.len());
tokio::fs::create_dir_all(&layers_dir).await?;
let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize));
let mut tasks = JoinSet::new();
for (file_idx, file_key) in files_list.keys.iter().enumerate() {
let Some(file_name) = file_key.object_name() else {
continue;
};
match LayerName::from_str(file_name) {
Ok(LayerName::Delta(_)) => continue,
Ok(LayerName::Image(_)) => (),
Err(_e) => {
// Split off the final part. We ensured above that this is not turning a
// generation-less delta layer file name into an image layer file name.
let Some(file_without_generation) = file_name.rsplit_once('-') else {
continue;
};
let Ok(LayerName::Image(_layer_file_name)) =
LayerName::from_str(file_without_generation.0)
else {
// Skipping because it's either not a layer or an image layer
//println!("object {file_name}: not an image layer");
continue;
};
}
}
let json_file_path = layers_dir.join(format!("{file_name}.json"));
if tokio::fs::try_exists(&json_file_path).await? {
//println!("object {file_name}: report already created");
// If we have already created a report for the layer, skip it.
continue;
}
let local_layer_path = layers_dir.join(file_name);
async fn stats(
semaphore: Arc<Semaphore>,
local_layer_path: Utf8PathBuf,
json_file_path: Utf8PathBuf,
tmp_dir: Utf8PathBuf,
storage: Arc<GenericRemoteStorage>,
file_key: RemotePath,
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>, anyhow::Error>
{
let _permit = semaphore.acquire().await?;
let cancel = CancellationToken::new();
let download = storage.download(&file_key, &cancel).await?;
let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?;
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?;
println!("Downloaded file to {local_layer_path}");
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let stats =
ImageLayer::compression_statistics(&tmp_dir, &local_layer_path, &ctx)
.await?;
let stats_str = serde_json::to_string(&stats).unwrap();
tokio::fs::write(json_file_path, stats_str).await?;
tokio::fs::remove_file(&local_layer_path).await?;
Ok(stats)
}
let semaphore = semaphore.clone();
let file_key = file_key.to_owned();
let storage = storage.clone();
let tmp_dir = tmp_dir.to_owned();
let file_name = file_name.to_owned();
let percent = (file_idx * 100) as f64 / files_list.keys.len() as f64;
tasks.spawn(async move {
let stats = stats(
semaphore,
local_layer_path.to_owned(),
json_file_path.to_owned(),
tmp_dir,
storage,
file_key,
)
.await;
match stats {
Ok(stats) => {
println!("Statistics for {file_name} ({percent:.1}%): {stats:?}\n")
}
Err(e) => eprintln!("Error for {file_name}: {e:?}"),
};
});
}
while let Some(_res) = tasks.join_next().await {}
Ok(())
}
}
}

View File

@@ -5,7 +5,7 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde;
use serde::de::IntoDeserializer;
@@ -55,6 +55,7 @@ pub mod defaults {
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
use pageserver_api::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
@@ -95,6 +96,8 @@ pub mod defaults {
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
@@ -290,6 +293,8 @@ pub struct PageServerConf {
pub validate_vectored_get: bool,
pub image_compression: Option<ImageCompressionAlgorithm>,
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
/// of ephemeral data.
@@ -400,6 +405,8 @@ struct PageServerConfigBuilder {
validate_vectored_get: BuilderValue<bool>,
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
}
@@ -487,6 +494,7 @@ impl PageServerConfigBuilder {
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
}
@@ -672,6 +680,10 @@ impl PageServerConfigBuilder {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
self.image_compression = BuilderValue::Set(value);
}
pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
}
@@ -732,6 +744,7 @@ impl PageServerConfigBuilder {
get_impl,
max_vectored_read_bytes,
validate_vectored_get,
image_compression,
ephemeral_bytes_per_memory_kb,
}
CUSTOM LOGIC
@@ -1026,6 +1039,9 @@ impl PageServerConf {
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
"image_compression" => {
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
}
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
}
@@ -1110,6 +1126,7 @@ impl PageServerConf {
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant"),
),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
}
@@ -1350,6 +1367,7 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
},
"Correct defaults should be used when no config values are provided"
@@ -1423,6 +1441,7 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
},
"Should be able to parse all basic config values correctly"

View File

@@ -2,10 +2,9 @@
//! and push them to a HTTP endpoint.
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{
mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant,
};
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
@@ -350,19 +349,12 @@ async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &Re
// Same for the loop that fetches computed metrics.
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
// which turns out is really handy to understand the system.
let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
return;
};
// this error can be returned if timeline is shutting down, but it does not
// mean the synthetic size worker should terminate.
let shutting_down = matches!(
e.downcast_ref::<PageReconstructError>(),
Some(PageReconstructError::Cancelled)
);
if !shutting_down {
let tenant_shard_id = tenant.tenant_shard_id();
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
Ok(_) => {}
Err(CalculateSyntheticSizeError::Cancelled) => {}
Err(e) => {
let tenant_shard_id = tenant.tenant_shard_id();
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
}
}
}

View File

@@ -1135,7 +1135,10 @@ async fn tenant_size_handler(
&ctx,
)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| match e {
crate::tenant::size::CalculateSyntheticSizeError::Cancelled => ApiError::ShuttingDown,
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})?;
let mut sizes = None;
let accepts_html = headers
@@ -1143,9 +1146,7 @@ async fn tenant_size_handler(
.map(|v| v == "text/html")
.unwrap_or_default();
if !inputs_only.unwrap_or(false) {
let storage_model = inputs
.calculate_model()
.map_err(ApiError::InternalServerError)?;
let storage_model = inputs.calculate_model();
let size = storage_model.calculate();
// If request header expects html, return html

View File

@@ -509,11 +509,24 @@ pub(crate) enum GcError {
#[error(transparent)]
Remote(anyhow::Error),
// An error reading while calculating GC cutoffs
#[error(transparent)]
GcCutoffs(PageReconstructError),
// If GC was invoked for a particular timeline, this error means it didn't exist
#[error("timeline not found")]
TimelineNotFound,
}
impl From<PageReconstructError> for GcError {
fn from(value: PageReconstructError) -> Self {
match value {
PageReconstructError::Cancelled => Self::TimelineCancelled,
other => Self::GcCutoffs(other),
}
}
}
impl Tenant {
/// Yet another helper for timeline initialization.
///
@@ -1033,7 +1046,6 @@ impl Tenant {
remote_metadata,
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
},
ctx,
@@ -1059,7 +1071,6 @@ impl Tenant {
timeline_id,
&index_part.metadata,
remote_timeline_client,
self.deletion_queue_client.clone(),
)
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
.await
@@ -2921,17 +2932,9 @@ impl Tenant {
.checked_sub(horizon)
.unwrap_or(Lsn(0));
let res = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await;
match res {
Ok(cutoffs) => {
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
assert!(old.is_none());
}
Err(e) => {
tracing::warn!(timeline_id = %timeline.timeline_id, "ignoring failure to find gc cutoffs: {e:#}");
}
}
let cutoffs = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await?;
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
assert!(old.is_none());
}
if !self.is_active() || self.cancel.is_cancelled() {
@@ -3443,7 +3446,6 @@ impl Tenant {
);
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
}
}
@@ -3553,7 +3555,7 @@ impl Tenant {
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<size::ModelInputs> {
) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> {
let logical_sizes_at_once = self
.conf
.concurrent_tenant_size_logical_size_queries
@@ -3568,8 +3570,8 @@ impl Tenant {
// See more for on the issue #2748 condenced out of the initial PR review.
let mut shared_cache = tokio::select! {
locked = self.cached_logical_sizes.lock() => locked,
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
_ = self.cancel.cancelled() => anyhow::bail!("tenant is shutting down"),
_ = cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
_ = self.cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
};
size::gather_inputs(
@@ -3593,10 +3595,10 @@ impl Tenant {
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<u64> {
) -> Result<u64, size::CalculateSyntheticSizeError> {
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
let size = inputs.calculate()?;
let size = inputs.calculate();
self.set_cached_synthetic_size(size);
@@ -3831,8 +3833,6 @@ pub(crate) mod harness {
tenant_conf.image_layer_creation_check_threshold,
),
switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: Some(tenant_conf.test_vm_bit_debug_logging),
}
}
}
@@ -4043,6 +4043,7 @@ mod tests {
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::tenant::timeline::CompactFlags;
use crate::walrecord::NeonWalRecord;
use crate::DEFAULT_PG_VERSION;
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
@@ -6366,7 +6367,7 @@ mod tests {
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap().0
v.unwrap()
}))
}
@@ -6707,8 +6708,8 @@ mod tests {
}
#[tokio::test]
async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction")?;
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -6863,4 +6864,79 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_neon_test_record() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_neon_test_record")?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let delta1 = vec![
(
get_key(1),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
),
(
get_key(1),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
),
(get_key(2), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(2),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
),
(get_key(3), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(3),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_clear()),
),
(get_key(4), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(4),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init()),
),
];
let image1 = vec![(get_key(1), "0x10".into())];
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1], // delta layers
vec![(Lsn(0x10), image1)], // image layers
Lsn(0x50),
)
.await?;
assert_eq!(
tline.get(get_key(1), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"0x10,0x20,0x30")
);
assert_eq!(
tline.get(get_key(2), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"0x10,0x20,0x30")
);
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
Ok(())
}
}

View File

@@ -11,7 +11,10 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use async_compression::Level;
use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext;
@@ -66,12 +69,29 @@ impl<'a> BlockCursor<'a> {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
len_buf[0] &= 0x7f;
len_buf[0] &= 0x0f;
u32::from_be_bytes(len_buf) as usize
};
let compression_bits = first_len_byte & 0xf0;
dstbuf.clear();
dstbuf.reserve(len);
let mut tmp_buf = Vec::new();
let buf_to_write;
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
buf_to_write = dstbuf;
None
} else if compression_bits == BYTE_ZSTD || compression_bits == BYTE_LZ4 {
buf_to_write = &mut tmp_buf;
Some(dstbuf)
} else {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid compression byte {compression_bits:x}"),
);
return Err(error);
};
buf_to_write.clear();
buf_to_write.reserve(len);
// Read the payload
let mut remain = len;
@@ -85,14 +105,38 @@ impl<'a> BlockCursor<'a> {
page_remain = PAGE_SZ;
}
let this_blk_len = min(remain, page_remain);
dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
remain -= this_blk_len;
off += this_blk_len;
}
if let Some(dstbuf) = compression {
if compression_bits == BYTE_ZSTD {
let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
decoder.write_all(buf_to_write).await?;
decoder.flush().await?;
} else if compression_bits == BYTE_LZ4 {
let decompressed = lz4_flex::block::decompress_size_prepended(&buf_to_write)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("lz4 decompression error: {e:?}"),
)
})?;
dstbuf.extend_from_slice(&decompressed);
} else {
unreachable!("already checked above")
}
}
Ok(())
}
}
const BYTE_UNCOMPRESSED: u8 = 0x80;
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
const BYTE_LZ4: u8 = BYTE_UNCOMPRESSED | 0x20;
/// A wrapper of `VirtualFile` that allows users to write blobs.
///
/// If a `BlobWriter` is dropped, the internal buffer will be
@@ -219,6 +263,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
self.write_blob_compressed(srcbuf, ctx, None).await
}
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
ctx: &RequestContext,
algorithm: Option<ImageCompressionAlgorithm>,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;
@@ -226,29 +281,75 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let (io_buf, hdr_res) = async {
let mut compressed_buf = None;
let ((io_buf, hdr_res), srcbuf) = async {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf, ctx).await
(
self.write_all(io_buf, ctx).await,
srcbuf.slice(..).into_inner(),
)
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
if len > 0x0fff_ffff {
return (
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
(
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
),
srcbuf.slice(..).into_inner(),
);
}
if len > 0x0fff_ffff {
tracing::warn!("writing blob above future limit ({len} bytes)");
}
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
use ImageCompressionAlgorithm::*;
let (high_bit_mask, len_written, srcbuf) = match algorithm {
Some(ZstdLow | Zstd | ZstdHigh) => {
let mut encoder = if matches!(algorithm, Some(ZstdLow)) {
async_compression::tokio::write::ZstdEncoder::with_quality(
Vec::new(),
Level::Precise(1),
)
} else if matches!(algorithm, Some(ZstdHigh)) {
async_compression::tokio::write::ZstdEncoder::with_quality(
Vec::new(),
Level::Precise(6),
)
} else {
async_compression::tokio::write::ZstdEncoder::new(Vec::new())
};
let slice = srcbuf.slice(..);
encoder.write_all(&slice[..]).await.unwrap();
encoder.shutdown().await.unwrap();
let compressed = encoder.into_inner();
if compressed.len() < len {
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_ZSTD, compressed_len, slice.into_inner())
} else {
(BYTE_UNCOMPRESSED, len, slice.into_inner())
}
}
Some(ImageCompressionAlgorithm::LZ4) => {
let slice = srcbuf.slice(..);
let compressed = lz4_flex::block::compress_prepend_size(&slice[..]);
if compressed.len() < len {
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_LZ4, compressed_len, slice.into_inner())
} else {
(BYTE_UNCOMPRESSED, len, slice.into_inner())
}
}
None => (BYTE_UNCOMPRESSED, len, srcbuf.slice(..).into_inner()),
};
let mut len_buf = (len_written as u32).to_be_bytes();
assert_eq!(len_buf[0] & 0xf0, 0);
len_buf[0] |= high_bit_mask;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf, ctx).await
(self.write_all(io_buf, ctx).await, srcbuf)
}
}
.await;
@@ -257,7 +358,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
}
let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
(Slice::into_inner(srcbuf.slice(..)), res)
} else {
self.write_all(srcbuf, ctx).await
};
(srcbuf, res.map(|_| offset))
}
}
@@ -295,6 +401,12 @@ mod tests {
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED, 0>(blobs).await
}
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
blobs: &[Vec<u8>],
) -> Result<(), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
@@ -305,7 +417,26 @@ mod tests {
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
let (_, res) = match COMPRESSION {
0 => wtr.write_blob(blob.clone(), &ctx).await,
1 => {
wtr.write_blob_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::ZstdLow),
)
.await
}
2 => {
wtr.write_blob_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::LZ4),
)
.await
}
_ => unreachable!("Invalid compression {COMPRESSION}"),
};
let offs = res?;
offsets.push(offs);
}
@@ -361,10 +492,17 @@ mod tests {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
b"hello".to_vec(),
random_array(66 * PAGE_SZ),
vec![0xf3; 24 * PAGE_SZ],
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, 1>(blobs).await?;
round_trip_test_compressed::<true, 1>(blobs).await?;
round_trip_test_compressed::<false, 2>(blobs).await?;
round_trip_test_compressed::<true, 2>(blobs).await?;
Ok(())
}

View File

@@ -377,9 +377,6 @@ pub struct TenantConf {
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
/// file is written.
pub switch_aux_file_policy: AuxFilePolicy,
#[cfg(feature = "testing")]
pub test_vm_bit_debug_logging: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -479,11 +476,6 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub switch_aux_file_policy: Option<AuxFilePolicy>,
#[cfg(feature = "testing")]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub test_vm_bit_debug_logging: Option<bool>,
}
impl TenantConfOpt {
@@ -546,10 +538,6 @@ impl TenantConfOpt {
switch_aux_file_policy: self
.switch_aux_file_policy
.unwrap_or(global_conf.switch_aux_file_policy),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: self
.test_vm_bit_debug_logging
.unwrap_or(global_conf.test_vm_bit_debug_logging),
}
}
}
@@ -594,8 +582,6 @@ impl Default for TenantConf {
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: false,
}
}
}
@@ -671,8 +657,6 @@ impl From<TenantConfOpt> for models::TenantConfig {
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
switch_aux_file_policy: value.switch_aux_file_policy,
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: value.test_vm_bit_debug_logging,
}
}
}

View File

@@ -513,7 +513,7 @@ impl<'a> TenantDownloader<'a> {
// cover our access to local storage.
let Ok(_guard) = self.secondary_state.gate.enter() else {
// Shutting down
return Ok(());
return Err(UpdateError::Cancelled);
};
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
@@ -846,7 +846,7 @@ impl<'a> TenantDownloader<'a> {
for layer in timeline.layers {
if self.secondary_state.cancel.is_cancelled() {
tracing::debug!("Cancelled -- dropping out of layer loop");
return Ok(());
return Err(UpdateError::Cancelled);
}
// Existing on-disk layers: just update their access time.

View File

@@ -3,7 +3,6 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
@@ -11,7 +10,7 @@ use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{LogicalSizeCalculationCause, Tenant};
use super::{GcError, LogicalSizeCalculationCause, Tenant};
use crate::tenant::Timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -43,6 +42,44 @@ pub struct SegmentMeta {
pub kind: LsnKind,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum CalculateSyntheticSizeError {
/// Something went wrong internally to the calculation of logical size at a particular branch point
#[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
LogicalSize {
timeline_id: TimelineId,
lsn: Lsn,
error: CalculateLogicalSizeError,
},
/// Something went wrong internally when calculating GC parameters at start of size calculation
#[error(transparent)]
GcInfo(GcError),
/// Totally unexpected errors, like panics joining a task
#[error(transparent)]
Fatal(anyhow::Error),
/// The LSN we are trying to calculate a size at no longer exists at the point we query it
#[error("Could not find size at {lsn} in timeline {timeline_id}")]
LsnNotFound { timeline_id: TimelineId, lsn: Lsn },
/// Tenant shut down while calculating size
#[error("Cancelled")]
Cancelled,
}
impl From<GcError> for CalculateSyntheticSizeError {
fn from(value: GcError) -> Self {
match value {
GcError::TenantCancelled | GcError::TimelineCancelled => {
CalculateSyntheticSizeError::Cancelled
}
other => CalculateSyntheticSizeError::GcInfo(other),
}
}
}
impl SegmentMeta {
fn size_needed(&self) -> bool {
match self.kind {
@@ -116,12 +153,9 @@ pub(super) async fn gather_inputs(
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<ModelInputs> {
) -> Result<ModelInputs, CalculateSyntheticSizeError> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
tenant
.refresh_gc_info(cancel, ctx)
.await
.context("Failed to refresh gc_info before gathering inputs")?;
tenant.refresh_gc_info(cancel, ctx).await?;
// Collect information about all the timelines
let mut timelines = tenant.list_timelines();
@@ -327,6 +361,12 @@ pub(super) async fn gather_inputs(
)
.await?;
if tenant.cancel.is_cancelled() {
// If we're shutting down, return an error rather than a sparse result that might include some
// timelines from before we started shutting down
return Err(CalculateSyntheticSizeError::Cancelled);
}
Ok(ModelInputs {
segments,
timeline_inputs,
@@ -345,7 +385,7 @@ async fn fill_logical_sizes(
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), CalculateSyntheticSizeError> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
timelines
.iter()
@@ -387,7 +427,7 @@ async fn fill_logical_sizes(
}
// Perform the size lookups
let mut have_any_error = false;
let mut have_any_error = None;
while let Some(res) = joinset.join_next().await {
// each of these come with Result<anyhow::Result<_>, JoinError>
// because of spawn + spawn_blocking
@@ -398,21 +438,36 @@ async fn fill_logical_sizes(
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
have_any_error = true;
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
anyhow::anyhow!(join_error)
.context("task that calls spawn_ondemand_logical_size_calculation"),
));
}
Ok(Err(recv_result_error)) => {
// cannot really do anything, as this panic is likely a bug
error!("failed to receive logical size query result: {recv_result_error:#}");
have_any_error = true;
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
anyhow::anyhow!(recv_result_error)
.context("Receiving logical size query result"),
));
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
if matches!(error, CalculateLogicalSizeError::Cancelled) {
// Skip this: it's okay if one timeline among many is shutting down while we
// calculate inputs for the overall tenant.
continue;
} else {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
timeline_id: timeline.timeline_id,
lsn,
error,
});
}
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
@@ -426,10 +481,10 @@ async fn fill_logical_sizes(
// prune any keys not needed anymore; we record every used key and added key.
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
if have_any_error {
if let Some(error) = have_any_error {
// we cannot complete this round, because we are missing data.
// we have however cached all we were able to request calculation on.
anyhow::bail!("failed to calculate some logical_sizes");
return Err(error);
}
// Insert the looked up sizes to the Segments
@@ -444,32 +499,29 @@ async fn fill_logical_sizes(
if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
seg.segment.size = Some(*size);
} else {
bail!("could not find size at {} in timeline {}", lsn, timeline_id);
return Err(CalculateSyntheticSizeError::LsnNotFound { timeline_id, lsn });
}
}
Ok(())
}
impl ModelInputs {
pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
// Convert SegmentMetas into plain Segments
let storage = StorageModel {
StorageModel {
segments: self
.segments
.iter()
.map(|seg| seg.segment.clone())
.collect(),
};
Ok(storage)
}
}
// calculate total project size
pub fn calculate(&self) -> anyhow::Result<u64> {
let storage = self.calculate_model()?;
pub fn calculate(&self) -> u64 {
let storage = self.calculate_model();
let sizes = storage.calculate();
Ok(sizes.total_size)
sizes.total_size
}
}
@@ -656,7 +708,7 @@ fn verify_size_for_multiple_branches() {
"#;
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
assert_eq!(inputs.calculate().unwrap(), 37_851_408);
assert_eq!(inputs.calculate(), 37_851_408);
}
#[test]
@@ -711,7 +763,7 @@ fn verify_size_for_one_branch() {
let model: ModelInputs = serde_json::from_str(doc).unwrap();
let res = model.calculate_model().unwrap().calculate();
let res = model.calculate_model().calculate();
println!("calculated synthetic size: {}", res.total_size);
println!("result: {:?}", serde_json::to_string(&res.segments));

View File

@@ -73,7 +73,7 @@ where
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
/// call, to collect more records.
///
#[derive(Debug, Default, Clone, PartialEq, Eq)]
#[derive(Debug, Default)]
pub struct ValueReconstructState {
pub records: Vec<(Lsn, NeonWalRecord)>,
pub img: Option<(Lsn, Bytes)>,
@@ -318,7 +318,7 @@ pub(crate) struct LayerFringe {
#[derive(Debug)]
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: Vec<KeySpace>,
target_keyspace: KeySpaceRandomAccum,
}
impl LayerFringe {
@@ -342,17 +342,13 @@ impl LayerFringe {
_,
LayerKeyspace {
layer,
target_keyspace,
mut target_keyspace,
},
)) => {
let mut keyspace = KeySpaceRandomAccum::new();
for ks in target_keyspace {
for part in ks.ranges {
keyspace.add_range(part);
}
}
Some((layer, keyspace.consume_keyspace(), read_desc.lsn_range))
}
)) => Some((
layer,
target_keyspace.consume_keyspace(),
read_desc.lsn_range,
)),
None => unreachable!("fringe internals are always consistent"),
}
}
@@ -367,16 +363,18 @@ impl LayerFringe {
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.push(keyspace);
entry.get_mut().target_keyspace.add_keyspace(keyspace);
}
Entry::Vacant(entry) => {
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range,
layer_id: layer_id.clone(),
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerKeyspace {
layer,
target_keyspace: vec![keyspace],
target_keyspace: accum,
});
}
}

View File

@@ -219,7 +219,6 @@ pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
lsn_range: Range<Lsn>,
file: VirtualFile,
file_id: FileId,
@@ -785,7 +784,6 @@ impl DeltaLayerInner {
file_id,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn_range: actual_summary.lsn_range,
max_vectored_read_bytes,
}))
}
@@ -822,7 +820,6 @@ impl DeltaLayerInner {
if entry_lsn < lsn_range.start {
return false;
}
assert!(entry_lsn <= search_key.lsn(), "certain because of how backwards visit direction works");
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
@@ -912,7 +909,7 @@ impl DeltaLayerInner {
let reads = Self::plan_reads(
&keyspace,
lsn_range,
lsn_range.clone(),
data_end_offset,
index_reader,
planner,
@@ -925,7 +922,7 @@ impl DeltaLayerInner {
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
Ok(())
}

View File

@@ -46,17 +46,19 @@ use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::MetadataExt;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio::time::Instant;
use tokio_stream::StreamExt;
use tracing::*;
@@ -366,6 +368,170 @@ impl ImageLayer {
res?;
Ok(())
}
pub async fn compression_statistics(
dest_repo_path: &Utf8Path,
path: &Utf8Path,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>> {
fn make_conf(
image_compression: Option<ImageCompressionAlgorithm>,
dest_repo_path: &Utf8Path,
) -> &'static PageServerConf {
let mut conf = PageServerConf::dummy_conf(dest_repo_path.to_owned());
conf.image_compression = image_compression;
Box::leak(Box::new(conf))
}
let image_compressions = [
None,
Some(ImageCompressionAlgorithm::ZstdLow),
Some(ImageCompressionAlgorithm::Zstd),
Some(ImageCompressionAlgorithm::ZstdHigh),
Some(ImageCompressionAlgorithm::LZ4),
];
let confs = image_compressions
.clone()
.map(|compression| make_conf(compression, dest_repo_path));
let mut stats = Vec::new();
for (image_compression, conf) in image_compressions.into_iter().zip(confs) {
let start_compression = Instant::now();
let compressed_path = Self::compress_for_conf(path, ctx, conf).await?;
let path_to_delete = compressed_path.clone();
scopeguard::defer!({
let _ = std::fs::remove_file(path_to_delete);
});
let size = path.metadata()?.size();
let elapsed_ms = start_compression.elapsed().as_millis() as u64;
let start_decompression = Instant::now();
Self::compare_are_equal(path, &compressed_path, ctx, &image_compression).await?;
let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64;
stats.push((
image_compression,
size,
elapsed_ms,
elapsed_decompression_ms,
));
tokio::task::yield_now().await;
}
Ok(stats)
}
async fn compress_for_conf(
path: &Utf8Path,
ctx: &RequestContext,
conf: &'static PageServerConf,
) -> anyhow::Result<Utf8PathBuf> {
let file =
VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
if summary.magic != IMAGE_FILE_MAGIC {
anyhow::bail!("magic file mismatch");
}
let tree_reader = DiskBtreeReader::new(
summary.index_start_blk,
summary.index_root_blk,
&block_reader,
);
let mut key_offset_stream =
std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx));
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id);
tokio::fs::create_dir_all(timeline_path).await?;
let mut writer = ImageLayerWriter::new(
conf,
summary.timeline_id,
tenant_shard_id,
&summary.key_range,
summary.lsn,
ctx,
)
.await?;
let cursor = block_reader.block_cursor();
while let Some(r) = key_offset_stream.next().await {
let (key, offset) = r?;
let key = Key::from_slice(&key);
let content = cursor.read_blob(offset, ctx).await?;
writer.put_image(key, content.into(), ctx).await?;
}
let path = writer.inner.take().unwrap().finish_inner(ctx).await?.2;
Ok(path)
}
async fn compare_are_equal(
path_a: &Utf8Path,
path_b: &Utf8Path,
ctx: &RequestContext,
cmp: &Option<ImageCompressionAlgorithm>,
) -> anyhow::Result<()> {
let mut files = Vec::new();
for path in [path_a, path_b] {
let file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true),
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
files.push(file);
}
let mut readers_summaries = Vec::new();
for file in files.iter() {
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
if summary.magic != IMAGE_FILE_MAGIC {
anyhow::bail!("magic file mismatch");
}
readers_summaries.push((block_reader, summary));
}
let mut tree_readers_cursors = Vec::new();
for (block_reader, summary) in readers_summaries.iter() {
let tree_reader = DiskBtreeReader::new(
summary.index_start_blk,
summary.index_root_blk,
block_reader,
);
let cursor = block_reader.block_cursor();
tree_readers_cursors.push((tree_reader, cursor));
}
let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0]
.0
.get_stream_from(&[0u8; KEY_SIZE], ctx));
let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[1]
.0
.get_stream_from(&[0u8; KEY_SIZE], ctx));
while let Some(r) = key_offset_stream_a.next().await {
let (key_a, offset_a): (Vec<u8>, _) = r?;
let Some(r) = key_offset_stream_b.next().await else {
panic!("second file at {path_b} has fewer keys than {path_a}");
};
let (key_b, offset_b): (Vec<u8>, _) = r?;
assert_eq!(key_a, key_b, "mismatch of keys for {path_a}:{path_b}");
let key = Key::from_slice(&key_a);
let content_a = tree_readers_cursors[0].1.read_blob(offset_a, ctx).await?;
let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?;
assert_eq!(
content_a, content_b,
"mismatch for key={key} cmp={cmp:?} and {path_a}:{path_b}"
);
//println!("match for key={key} cmp={cmp:?} from {path_a}");
}
Ok(())
}
}
impl ImageLayerInner {
@@ -782,7 +948,10 @@ impl ImageLayerWriterInner {
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
let (_img, res) = self
.blob_writer
.write_blob_compressed(img, ctx, self.conf.image_compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let off = res?;
@@ -796,11 +965,10 @@ impl ImageLayerWriterInner {
///
/// Finish writing the image layer.
///
async fn finish(
async fn finish_inner(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
) -> anyhow::Result<(&'static PageServerConf, PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -854,8 +1022,16 @@ impl ImageLayerWriterInner {
// fsync the file
file.sync_all().await?;
Ok((self.conf, desc, self.path))
}
async fn finish(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
let (conf, desc, path) = self.finish_inner(ctx).await?;
// FIXME: why not carry the virtualfile here, it supports renaming?
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
let layer = Layer::finish_creating(conf, timeline, desc, &path)?;
info!("created image layer {}", layer.local_path());
@@ -923,6 +1099,12 @@ impl ImageLayerWriter {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
/// Obtains the current size of the file
pub(crate) fn size(&self) -> u64 {
let inner = self.inner.as_ref().unwrap();
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
}
///
/// Finish writing the image layer.
///

View File

@@ -62,6 +62,7 @@ use std::{
ops::ControlFlow,
};
use crate::metrics::GetKind;
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
use crate::{
aux_file::AuxFileSizeEstimator,
@@ -75,7 +76,6 @@ use crate::{
disk_usage_eviction_task::DiskUsageEvictionInfo,
pgdatadir_mapping::CollectKeySpaceError,
};
use crate::{deletion_queue::DeletionQueueClient, metrics::GetKind};
use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
@@ -205,7 +205,6 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub deletion_queue_client: DeletionQueueClient,
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
@@ -910,9 +909,7 @@ impl Timeline {
img: cached_page_img,
};
self.get_impl(key, lsn, reconstruct_state, ctx)
.await
.map(|v| v.0)
self.get_impl(key, lsn, reconstruct_state, ctx).await
}
GetImpl::Vectored => {
let keyspace = KeySpace {
@@ -924,59 +921,16 @@ impl Timeline {
let mut reconstruct_state = ValuesReconstructState::new();
// Only add the cached image to the reconstruct state when it exists.
let cached_page_img_lsn = cached_page_img.as_ref().map(|(lsn, _)| *lsn);
if cached_page_img.is_some() {
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
}
let debug_log = {
#[cfg(feature = "testing")]
{
self.get_test_vm_bit_debug_logging()
}
#[cfg(not(feature = "testing"))]
{
false
}
};
if debug_log {
tracing::info!(%key, %lsn, ?cached_page_img_lsn, "debug-logging page reconstruction");
}
if debug_log {
tracing::info!(
location = "before vectored get",
"debug-logging page reconstruction"
);
self.layers
.read()
.await
.layer_map()
.dump(false, ctx)
.await
.unwrap();
}
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
if debug_log {
tracing::info!(
location = "before validation",
"debug-logging page reconstruction"
);
self.layers
.read()
.await
.layer_map()
.dump(false, ctx)
.await
.unwrap();
}
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
@@ -994,7 +948,7 @@ impl Timeline {
"Singular vectored get returned wrong key"
)))
} else {
value.map(|v| v.0)
value
}
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
@@ -1018,7 +972,7 @@ impl Timeline {
lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
) -> Result<Bytes, PageReconstructError> {
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
@@ -1157,12 +1111,7 @@ impl Timeline {
}
}
let Ok(res) = res else {
return Err(res.unwrap_err());
};
Ok(BTreeMap::from_iter(
res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
))
res
}
/// Scan the keyspace and return all existing key-values in the keyspace. This currently uses vectored
@@ -1226,12 +1175,7 @@ impl Timeline {
recording.observe(throttled);
}
let Ok(vectored_res) = vectored_res else {
return Err(vectored_res.unwrap_err());
};
Ok(BTreeMap::from_iter(
vectored_res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
))
vectored_res
}
/// Not subject to [`Self::timeline_get_throttle`].
@@ -1240,10 +1184,7 @@ impl Timeline {
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
GetVectoredError,
> {
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new();
for range in keyspace.ranges {
@@ -1302,10 +1243,7 @@ impl Timeline {
lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
GetVectoredError,
> {
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let get_kind = if keyspace.total_raw_size() == 1 {
GetKind::Singular
} else {
@@ -1322,10 +1260,7 @@ impl Timeline {
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind)
.start_timer();
let mut results: BTreeMap<
Key,
Result<(Bytes, ValueReconstructState), PageReconstructError>,
> = BTreeMap::new();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
@@ -1361,10 +1296,7 @@ impl Timeline {
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn validate_get_vectored_impl(
&self,
vectored_res: &Result<
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
GetVectoredError,
>,
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
@@ -1437,8 +1369,8 @@ impl Timeline {
key: &Key,
keyspace: &KeySpace,
lsn: Lsn,
(seq, seq_reconstruct_state): &(Bytes, ValueReconstructState),
(vec, vec_reconstruct_state): &(Bytes, ValueReconstructState),
seq: &Bytes,
vec: &Bytes,
) {
if *key == AUX_FILES_KEY {
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
@@ -1461,16 +1393,10 @@ impl Timeline {
}
} else {
// All other keys should reconstruct deterministically, so we simply compare the blobs.
if seq != vec {
assert_eq!(
seq_reconstruct_state, vec_reconstruct_state,
"Reconstruct state mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
);
assert_eq!(
seq, vec,
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
);
}
assert_eq!(
seq, vec,
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
);
}
}
@@ -2239,15 +2165,6 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
#[cfg(feature = "testing")]
fn get_test_vm_bit_debug_logging(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.test_vm_bit_debug_logging
.unwrap_or(self.conf.default_tenant_conf.test_vm_bit_debug_logging)
}
fn get_image_layer_creation_check_threshold(&self) -> u8 {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2263,10 +2180,6 @@ impl Timeline {
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
#[cfg(feature = "testing")]
{
info!(?new_conf.test_vm_bit_debug_logging, "updating tenant conf");
}
// The threshold is embedded in the metric. So, we need to update it.
{
@@ -4442,7 +4355,7 @@ impl Timeline {
let mut total_kb_retrieved = 0;
let mut total_keys_retrieved = 0;
for (k, v) in data {
let (v, _) = v.map_err(CreateImageLayersError::PageReconstructError)?;
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
total_kb_retrieved += KEY_SIZE + v.len();
total_keys_retrieved += 1;
new_data.insert(k, v);
@@ -4909,7 +4822,7 @@ impl Timeline {
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<GcCutoffs> {
) -> Result<GcCutoffs, PageReconstructError> {
let _timer = self
.metrics
.find_gc_cutoffs_histo
@@ -5237,7 +5150,7 @@ impl Timeline {
key: Key,
request_lsn: Lsn,
mut data: ValueReconstructState,
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
) -> Result<Bytes, PageReconstructError> {
// Perform WAL redo if needed
data.records.reverse();
@@ -5250,7 +5163,7 @@ impl Timeline {
img_lsn,
request_lsn,
);
Ok((img.clone(), data))
Ok(img.clone())
} else {
Err(PageReconstructError::from(anyhow!(
"base image for {key} at {request_lsn} not found"
@@ -5282,8 +5195,6 @@ impl Timeline {
let last_rec_lsn = data.records.last().unwrap().0;
let ret_state = data.clone();
let img = match self
.walredo_mgr
.as_ref()
@@ -5314,7 +5225,7 @@ impl Timeline {
}
}
Ok((img, ret_state))
Ok(img)
}
}
}

View File

@@ -1064,7 +1064,7 @@ impl Timeline {
img: base_image,
records: delta_above_base_image,
};
let (img, _) = tline.reconstruct_value(key, horizon, state).await?;
let img = tline.reconstruct_value(key, horizon, state).await?;
Ok((keys_above_horizon, img))
}

View File

@@ -11,7 +11,6 @@ use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
use crate::{
config::PageServerConf,
deletion_queue::DeletionQueueClient,
task_mgr::{self, TaskKind},
tenant::{
metadata::TimelineMetadata,
@@ -263,7 +262,6 @@ impl DeleteTimelineFlow {
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: RemoteTimelineClient,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -274,7 +272,6 @@ impl DeleteTimelineFlow {
None, // Ancestor is not needed for deletion.
TimelineResources {
remote_client,
deletion_queue_client,
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
},
// Important. We dont pass ancestor above because it can be missing.

View File

@@ -49,6 +49,19 @@ pub enum NeonWalRecord {
file_path: String,
content: Option<Bytes>,
},
/// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
#[cfg(test)]
Test {
/// Append a string to the image.
append: String,
/// Clear the image before appending.
clear: bool,
/// Treat this record as an init record. `clear` should be set to true if this field is set
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
/// its references in `timeline.rs`.
will_init: bool,
},
}
impl NeonWalRecord {
@@ -58,11 +71,39 @@ impl NeonWalRecord {
// If you change this function, you'll also need to change ValueBytes::will_init
match self {
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
#[cfg(test)]
NeonWalRecord::Test { will_init, .. } => *will_init,
// None of the special neon record types currently initialize the page
_ => false,
}
}
#[cfg(test)]
pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
Self::Test {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
}
}
#[cfg(test)]
pub(crate) fn wal_clear() -> Self {
Self::Test {
append: "".to_string(),
clear: true,
will_init: false,
}
}
#[cfg(test)]
pub(crate) fn wal_init() -> Self {
Self::Test {
append: "".to_string(),
clear: true,
will_init: true,
}
}
}
/// DecodedBkpBlock represents per-page data contained in a WAL record.

View File

@@ -244,6 +244,20 @@ pub(crate) fn apply_in_neon(
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
}
#[cfg(test)]
NeonWalRecord::Test {
append,
clear,
will_init,
} => {
if *will_init {
assert!(*clear, "init record must be clear to ensure correctness");
}
if *clear {
page.clear();
}
page.put_slice(append.as_bytes());
}
}
Ok(())
}

View File

@@ -1,19 +1,8 @@
From 0b0194a57bd0f3598bd57dbedd0df3932330169d Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 2 Feb 2024 22:26:45 +0200
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
Now that the WAL-logging happens as a separate step at the end of the
build, we need a few neon-specific hints to make it work.
---
src/hnswbuild.c | 36 ++++++++++++++++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
index 680789b..ec54dea 100644
index dcfb2bd..d5189ee 100644
--- a/src/hnswbuild.c
+++ b/src/hnswbuild.c
@@ -840,9 +840,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
@@ -860,9 +860,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
@@ -31,7 +20,7 @@ index 680789b..ec54dea 100644
/* Close relations within worker */
index_close(indexRel, indexLockmode);
table_close(heapRel, heapLockmode);
@@ -1089,13 +1097,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
@@ -1117,12 +1125,38 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
SeedRandom(42);
#endif
@@ -43,14 +32,13 @@ index 680789b..ec54dea 100644
BuildGraph(buildstate, forkNum);
- if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM)
+#ifdef NEON_SMGR
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
+#endif
+
if (RelationNeedsWAL(index))
+ {
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
+ if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) {
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true);
+#ifdef NEON_SMGR
+ {
+#if PG_VERSION_NUM >= 160000
@@ -60,7 +48,7 @@ index 680789b..ec54dea 100644
+#endif
+
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM);
+ }
+#endif
@@ -69,10 +57,6 @@ index 680789b..ec54dea 100644
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(RelationGetSmgr(index));
+#endif
+
FreeBuildState(buildstate);
}
--
2.39.2

View File

@@ -3112,12 +3112,12 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
request_lsn = UINT64_MAX;
/*
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU
* segment has not changed since the basebackup, because in order to
* modify it, we would have had to download it already. And once
* downloaded, we never evict SLRU segments from local disk.
*/
not_modified_since = GetRedoStartLsn();
not_modified_since = nm_adjust_lsn(GetRedoStartLsn());
SlruKind kind;

View File

@@ -1,16 +1,183 @@
use measured::FixedCardinalityLabel;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::{self, Display};
use crate::auth::IpPattern;
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::proxy::retry::ShouldRetry;
/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
#[derive(Debug, Deserialize)]
pub struct ConsoleError {
pub error: Box<str>,
#[serde(skip)]
pub http_status_code: http::StatusCode,
pub status: Option<Status>,
}
impl ConsoleError {
pub fn get_reason(&self) -> Reason {
self.status
.as_ref()
.and_then(|s| s.details.error_info.as_ref())
.map(|e| e.reason)
.unwrap_or(Reason::Unknown)
}
pub fn get_user_facing_message(&self) -> String {
use super::provider::errors::REQUEST_FAILED;
self.status
.as_ref()
.and_then(|s| s.details.user_facing_message.as_ref())
.map(|m| m.message.clone().into())
.unwrap_or_else(|| {
// Ask @neondatabase/control-plane for review before adding more.
match self.http_status_code {
http::StatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
http::StatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
}
_ => REQUEST_FAILED.to_owned(),
}
})
}
}
impl Display for ConsoleError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = self
.status
.as_ref()
.and_then(|s| s.details.user_facing_message.as_ref())
.map(|m| m.message.as_ref())
.unwrap_or_else(|| &self.error);
write!(f, "{}", msg)
}
}
impl ShouldRetry for ConsoleError {
fn could_retry(&self) -> bool {
if self.status.is_none() || self.status.as_ref().unwrap().details.retry_info.is_none() {
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
return match &self {
ConsoleError {
http_status_code: http::StatusCode::BAD_REQUEST,
..
} => true,
// don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} => !error.contains("compute time quota of non-primary branches is exceeded"),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
ref error,
..
} => {
!error.contains("quota exceeded")
&& !error.contains("the limit for current plan reached")
}
_ => false,
};
}
// retry if the response has a retry delay
if let Some(retry_info) = self
.status
.as_ref()
.and_then(|s| s.details.retry_info.as_ref())
{
retry_info.retry_delay_ms > 0
} else {
false
}
}
}
#[derive(Debug, Deserialize)]
pub struct Status {
pub code: Box<str>,
pub message: Box<str>,
pub details: Details,
}
#[derive(Debug, Deserialize)]
pub struct Details {
pub error_info: Option<ErrorInfo>,
pub retry_info: Option<RetryInfo>,
pub user_facing_message: Option<UserFacingMessage>,
}
#[derive(Debug, Deserialize)]
pub struct ErrorInfo {
pub reason: Reason,
// Schema could also have `metadata` field, but it's not structured. Skip it for now.
}
#[derive(Clone, Copy, Debug, Deserialize, Default)]
pub enum Reason {
#[serde(rename = "ROLE_PROTECTED")]
RoleProtected,
#[serde(rename = "RESOURCE_NOT_FOUND")]
ResourceNotFound,
#[serde(rename = "PROJECT_NOT_FOUND")]
ProjectNotFound,
#[serde(rename = "ENDPOINT_NOT_FOUND")]
EndpointNotFound,
#[serde(rename = "BRANCH_NOT_FOUND")]
BranchNotFound,
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
RateLimitExceeded,
#[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
NonPrimaryBranchComputeTimeExceeded,
#[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
ActiveTimeQuotaExceeded,
#[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
ComputeTimeQuotaExceeded,
#[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
WrittenDataQuotaExceeded,
#[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
DataTransferQuotaExceeded,
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
LogicalSizeQuotaExceeded,
#[default]
#[serde(other)]
Unknown,
}
impl Reason {
pub fn is_not_found(&self) -> bool {
matches!(
self,
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound
)
}
}
#[derive(Debug, Deserialize)]
pub struct RetryInfo {
pub retry_delay_ms: u64,
}
#[derive(Debug, Deserialize)]
pub struct UserFacingMessage {
pub message: Box<str>,
}
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].

View File

@@ -25,8 +25,8 @@ use tracing::info;
pub mod errors {
use crate::{
console::messages::{self, ConsoleError},
error::{io_error, ReportableError, UserFacingError},
http,
proxy::retry::ShouldRetry,
};
use thiserror::Error;
@@ -34,17 +34,14 @@ pub mod errors {
use super::ApiLockError;
/// A go-to error message which doesn't leak any detail.
const REQUEST_FAILED: &str = "Console request failed";
pub const REQUEST_FAILED: &str = "Console request failed";
/// Common console API error.
#[derive(Debug, Error)]
pub enum ApiError {
/// Error returned by the console itself.
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
Console {
status: http::StatusCode,
text: Box<str>,
},
#[error("{REQUEST_FAILED} with {0}")]
Console(ConsoleError),
/// Various IO errors like broken pipe or malformed payload.
#[error("{REQUEST_FAILED}: {0}")]
@@ -53,11 +50,11 @@ pub mod errors {
impl ApiError {
/// Returns HTTP status code if it's the reason for failure.
pub fn http_status_code(&self) -> Option<http::StatusCode> {
pub fn get_reason(&self) -> messages::Reason {
use ApiError::*;
match self {
Console { status, .. } => Some(*status),
_ => None,
Console(e) => e.get_reason(),
_ => messages::Reason::Unknown,
}
}
}
@@ -67,22 +64,7 @@ pub mod errors {
use ApiError::*;
match self {
// To minimize risks, only select errors are forwarded to users.
// Ask @neondatabase/control-plane for review before adding more.
Console { status, .. } => match *status {
http::StatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
http::StatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
}
_ => REQUEST_FAILED.to_owned(),
},
Console(c) => c.get_user_facing_message(),
_ => REQUEST_FAILED.to_owned(),
}
}
@@ -91,29 +73,56 @@ pub mod errors {
impl ReportableError for ApiError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ApiError::Console {
status: http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ApiError::Console {
status: http::StatusCode::UNPROCESSABLE_ENTITY,
text,
} if text.contains("compute time quota of non-primary branches is exceeded") => {
crate::error::ErrorKind::User
ApiError::Console(e) => {
use crate::error::ErrorKind::*;
match e.get_reason() {
crate::console::messages::Reason::RoleProtected => User,
crate::console::messages::Reason::ResourceNotFound => User,
crate::console::messages::Reason::ProjectNotFound => User,
crate::console::messages::Reason::EndpointNotFound => User,
crate::console::messages::Reason::BranchNotFound => User,
crate::console::messages::Reason::RateLimitExceeded => ServiceRateLimit,
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
User
}
crate::console::messages::Reason::ActiveTimeQuotaExceeded => User,
crate::console::messages::Reason::ComputeTimeQuotaExceeded => User,
crate::console::messages::Reason::WrittenDataQuotaExceeded => User,
crate::console::messages::Reason::DataTransferQuotaExceeded => User,
crate::console::messages::Reason::LogicalSizeQuotaExceeded => User,
crate::console::messages::Reason::Unknown => match &e {
ConsoleError {
http_status_code:
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
error,
..
} if error.contains(
"compute time quota of non-primary branches is exceeded",
) =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
error,
..
} if error.contains("quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ConsoleError { .. } => crate::error::ErrorKind::ControlPlane,
},
}
}
ApiError::Console {
status: http::StatusCode::LOCKED,
text,
} if text.contains("quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ApiError::Console {
status: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ApiError::Console { .. } => crate::error::ErrorKind::ControlPlane,
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
}
}
@@ -124,31 +133,7 @@ pub mod errors {
match self {
// retry some transport errors
Self::Transport(io) => io.could_retry(),
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
Self::Console {
status: http::StatusCode::BAD_REQUEST,
..
} => true,
// don't retry when quotas are exceeded
Self::Console {
status: http::StatusCode::UNPROCESSABLE_ENTITY,
ref text,
} => !text.contains("compute time quota of non-primary branches is exceeded"),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
Self::Console {
status: http::StatusCode::LOCKED,
ref text,
} => {
// written data quota exceeded
// data transfer quota exceeded
// compute time quota exceeded
// logical size quota exceeded
!text.contains("quota exceeded")
&& !text.contains("the limit for current plan reached")
}
_ => false,
Self::Console(e) => e.could_retry(),
}
}
}
@@ -509,7 +494,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
self.metrics
.semaphore_acquire_seconds
.observe(now.elapsed().as_secs_f64());
info!("acquired permit {:?}", now.elapsed().as_secs_f64());
Ok(WakeComputePermit { permit: permit? })
}

View File

@@ -94,12 +94,14 @@ impl Api {
let body = match parse_body::<GetRoleSecret>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
Err(e) => match e.http_status_code() {
Some(http::StatusCode::NOT_FOUND) => {
// TODO(anna): retry
Err(e) => {
if e.get_reason().is_not_found() {
return Ok(AuthInfo::default());
} else {
return Err(e.into());
}
_otherwise => return Err(e.into()),
},
}
};
let secret = if body.role_secret.is_empty() {
@@ -328,19 +330,24 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
info!("request succeeded, processing the body");
return Ok(response.json().await?);
}
let s = response.bytes().await?;
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
info!("response_error plaintext: {:?}", s);
// Don't throw an error here because it's not as important
// as the fact that the request itself has failed.
let body = response.json().await.unwrap_or_else(|e| {
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
warn!("failed to parse error body: {e}");
ConsoleError {
error: "reason unclear (malformed error message)".into(),
http_status_code: status,
status: None,
}
});
body.http_status_code = status;
let text = body.error;
error!("console responded with an error ({status}): {text}");
Err(ApiError::Console { status, text })
error!("console responded with an error ({status}): {body:?}");
Err(ApiError::Console(body))
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {

View File

@@ -12,7 +12,7 @@ use crate::auth::backend::{
};
use crate::config::{CertResolver, RetryConfig};
use crate::console::caches::NodeInfoCache;
use crate::console::messages::MetricsAuxInfo;
use crate::console::messages::{ConsoleError, MetricsAuxInfo};
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
use crate::console::{self, CachedNodeInfo, NodeInfo};
use crate::error::ErrorKind;
@@ -484,18 +484,20 @@ impl TestBackend for TestConnectMechanism {
match action {
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
ConnectAction::WakeFail => {
let err = console::errors::ApiError::Console {
status: http::StatusCode::FORBIDDEN,
text: "TEST".into(),
};
let err = console::errors::ApiError::Console(ConsoleError {
http_status_code: http::StatusCode::FORBIDDEN,
error: "TEST".into(),
status: None,
});
assert!(!err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}
ConnectAction::WakeRetry => {
let err = console::errors::ApiError::Console {
status: http::StatusCode::BAD_REQUEST,
text: "TEST".into(),
};
let err = console::errors::ApiError::Console(ConsoleError {
http_status_code: http::StatusCode::BAD_REQUEST,
error: "TEST".into(),
status: None,
});
assert!(err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}

View File

@@ -1,4 +1,5 @@
use crate::config::RetryConfig;
use crate::console::messages::ConsoleError;
use crate::console::{errors::WakeComputeError, provider::CachedNodeInfo};
use crate::context::RequestMonitoring;
use crate::metrics::{
@@ -88,36 +89,76 @@ fn report_error(e: &WakeComputeError, retry: bool) {
let kind = match e {
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
WakeupFailureKind::QuotaExceeded
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::UNPROCESSABLE_ENTITY,
ref text,
}) if text.contains("compute time quota of non-primary branches is exceeded") => {
WakeupFailureKind::QuotaExceeded
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => WakeupFailureKind::ApiConsoleBadRequest,
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
WakeupFailureKind::ApiConsoleOtherServerError
}
WakeComputeError::ApiError(ApiError::Console { .. }) => {
WakeupFailureKind::ApiConsoleOtherError
}
WakeComputeError::ApiError(ApiError::Console(e)) => match e.get_reason() {
crate::console::messages::Reason::RoleProtected => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::ResourceNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::ProjectNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::EndpointNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::BranchNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::RateLimitExceeded => {
WakeupFailureKind::ApiConsoleLocked
}
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::ActiveTimeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::ComputeTimeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::WrittenDataQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::DataTransferQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::LogicalSizeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::Unknown => match e {
ConsoleError {
http_status_code: StatusCode::LOCKED,
ref error,
..
} if error.contains("written data quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
WakeupFailureKind::QuotaExceeded
}
ConsoleError {
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} if error.contains("compute time quota of non-primary branches is exceeded") => {
WakeupFailureKind::QuotaExceeded
}
ConsoleError {
http_status_code: StatusCode::LOCKED,
..
} => WakeupFailureKind::ApiConsoleLocked,
ConsoleError {
http_status_code: StatusCode::BAD_REQUEST,
..
} => WakeupFailureKind::ApiConsoleBadRequest,
ConsoleError {
http_status_code, ..
} if http_status_code.is_server_error() => {
WakeupFailureKind::ApiConsoleOtherServerError
}
ConsoleError { .. } => WakeupFailureKind::ApiConsoleOtherError,
},
},
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
};

View File

@@ -1,5 +1,3 @@
use std::usize;
use super::{LimitAlgorithm, Outcome, Sample};
/// Loss-based congestion avoidance.

View File

@@ -32,8 +32,6 @@ pub struct ClientFirstMessage<'a> {
pub bare: &'a str,
/// Channel binding mode.
pub cbind_flag: ChannelBinding<&'a str>,
/// (Client username)[<https://github.com/postgres/postgres/blob/94226d4506e66d6e7cbf/src/backend/libpq/auth-scram.c#L13>].
pub username: &'a str,
/// Client nonce.
pub nonce: &'a str,
}
@@ -58,6 +56,14 @@ impl<'a> ClientFirstMessage<'a> {
// In theory, these might be preceded by "reserved-mext" (i.e. "m=")
let username = parts.next()?.strip_prefix("n=")?;
// https://github.com/postgres/postgres/blob/f83908798f78c4cafda217ca875602c88ea2ae28/src/backend/libpq/auth-scram.c#L13-L14
if !username.is_empty() {
tracing::warn!(username, "scram username provided, but is not expected")
// TODO(conrad):
// return None;
}
let nonce = parts.next()?.strip_prefix("r=")?;
// Validate but ignore auth extensions
@@ -66,7 +72,6 @@ impl<'a> ClientFirstMessage<'a> {
Some(Self {
bare,
cbind_flag,
username,
nonce,
})
}
@@ -188,19 +193,18 @@ mod tests {
// (Almost) real strings captured during debug sessions
let cases = [
(NotSupportedClient, "n,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedServer, "y,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedClient, "n,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedServer, "y,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
(
Required("tls-server-end-point"),
"p=tls-server-end-point,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju",
"p=tls-server-end-point,,n=,r=t8JwklwKecDLwSsA72rHmVju",
),
];
for (cb, input) in cases {
let msg = ClientFirstMessage::parse(input).unwrap();
assert_eq!(msg.bare, "n=pepe,r=t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.username, "pepe");
assert_eq!(msg.bare, "n=,r=t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.nonce, "t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.cbind_flag, cb);
}
@@ -208,14 +212,13 @@ mod tests {
#[test]
fn parse_client_first_message_with_invalid_gs2_authz() {
assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none())
assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none())
}
#[test]
fn parse_client_first_message_with_extra_params() {
let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap();
assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz");
assert_eq!(msg.username, "user");
let msg = ClientFirstMessage::parse("n,,n=,r=nonce,a=foo,b=bar,c=baz").unwrap();
assert_eq!(msg.bare, "n=,r=nonce,a=foo,b=bar,c=baz");
assert_eq!(msg.nonce, "nonce");
assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient);
}
@@ -223,9 +226,9 @@ mod tests {
#[test]
fn parse_client_first_message_with_extra_params_invalid() {
// must be of the form `<ascii letter>=<...>`
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none());
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,abc=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,1=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,a").is_none());
}
#[test]

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.78.0"
channel = "1.79.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env -S python3 -u
import argparse
import json
import os
from pprint import pprint
import matplotlib.pyplot as plt
parser = argparse.ArgumentParser(prog="compression-report")
parser.add_argument("dir")
args = parser.parse_args()
files = []
for file_name in os.listdir(args.dir):
if not file_name.endswith(".json"):
continue
file_path = os.path.join(args.dir, file_name)
with open(file_path) as json_str:
json_data = json.load(json_str)
files.append((file_name, json_data))
#pprint(files)
extra_zstd_lines = True
dc = 2 # data column to use (1 for sizes, 2 for time)
sort_by = "ZstdHigh"
files.sort(key=lambda file_data: [x for x in file_data[1] if x[0] == sort_by][0][dc])
x_axis = []
data_baseline = []
data_lz4 = []
data_zstd = []
data_zstd_low = []
data_zstd_high = []
for idx, f in enumerate(files):
file_data = f[1]
#pprint(file_data)
x_axis.append(idx)
data_baseline.append([x for x in file_data if x[0] is None][0][dc])
data_lz4.append([x for x in file_data if x[0] == "LZ4"][0][dc])
data_zstd.append([x for x in file_data if x[0] == "Zstd"][0][dc])
if extra_zstd_lines:
data_zstd_low.append([x for x in file_data if x[0] == "ZstdLow"][0][dc])
data_zstd_high.append([x for x in file_data if x[0] == "ZstdHigh"][0][dc])
plt.plot(x_axis, data_baseline, "x", markeredgewidth=2, label="baseline")
plt.plot(x_axis, data_lz4, "x", markeredgewidth=2, label="lz4")
plt.plot(x_axis, data_zstd, "x", markeredgewidth=2, label="Zstd")
if extra_zstd_lines:
plt.plot(x_axis, data_zstd_low, "x", markeredgewidth=2, label="ZstdLow")
plt.plot(x_axis, data_zstd_high, "x", markeredgewidth=2, label="ZstdHigh")
# plt.style.use('_mpl-gallery')
plt.ylim(bottom=0)
plt.legend(loc="upper left")
figure_path = os.path.join(args.dir, "figure.png")
print(f"saving figure to {figure_path}")
plt.savefig(figure_path)
plt.show()

View File

@@ -31,6 +31,7 @@ pub(crate) enum PageserverState {
Available {
last_seen_at: Instant,
utilization: PageserverUtilization,
new: bool,
},
Offline,
}
@@ -127,6 +128,7 @@ impl HeartbeaterTask {
heartbeat_futs.push({
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
let new_node = !self.state.contains_key(node_id);
// Clone the node and mark it as available such that the request
// goes through to the pageserver even when the node is marked offline.
@@ -159,6 +161,7 @@ impl HeartbeaterTask {
PageserverState::Available {
last_seen_at: Instant::now(),
utilization,
new: new_node,
}
} else {
PageserverState::Offline
@@ -220,6 +223,7 @@ impl HeartbeaterTask {
}
},
Vacant(_) => {
// This is a new node. Don't generate a delta for it.
deltas.push((node_id, ps_state.clone()));
}
}

View File

@@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
TenantLocateResponseShard,
TenantLocateResponseShard, UtilizationScore,
},
shard::TenantShardId,
};
@@ -116,6 +116,16 @@ impl Node {
match (self.availability, availability) {
(Offline, Active(_)) => ToActive,
(Active(_), Offline) => ToOffline,
// Consider the case when the storage controller handles the re-attach of a node
// before the heartbeats detect that the node is back online. We still need
// [`Service::node_configure`] to attempt reconciliations for shards with an
// unknown observed location.
// The unsavoury match arm below handles this situation.
(Active(lhs), Active(rhs))
if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
{
ToActive
}
_ => Unchanged,
}
}

View File

@@ -12,7 +12,7 @@ use crate::{
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
},
@@ -747,29 +747,61 @@ impl Service {
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
for (node_id, state) in deltas.0 {
let new_availability = match state {
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
UtilizationScore(utilization.utilization_score),
let (new_node, new_availability) = match state {
PageserverState::Available {
utilization, new, ..
} => (
new,
NodeAvailability::Active(UtilizationScore(
utilization.utilization_score,
)),
),
PageserverState::Offline => NodeAvailability::Offline,
PageserverState::Offline => (false, NodeAvailability::Offline),
};
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;
match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!("Node {} was not found after heartbeat round", node_id);
if new_node {
// When the heartbeats detect a newly added node, we don't wish
// to attempt to reconcile the shards assigned to it. The node
// is likely handling it's re-attach response, so reconciling now
// would be counterproductive.
//
// Instead, update the in-memory state with the details learned about the
// node.
let mut locked = self.inner.write().unwrap();
let (nodes, _tenants, scheduler) = locked.parts_mut();
let mut new_nodes = (**nodes).clone();
if let Some(node) = new_nodes.get_mut(&node_id) {
node.set_availability(new_availability);
scheduler.node_upsert(node);
}
Err(err) => {
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);
locked.nodes = Arc::new(new_nodes);
} else {
// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;
match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!(
"Node {} was not found after heartbeat round",
node_id
);
}
Err(err) => {
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);
}
}
}
}
@@ -2409,11 +2441,17 @@ impl Service {
(detach_waiters, shard_ids, node.clone())
};
if let Err(e) = self.await_waiters(detach_waiters, RECONCILE_TIMEOUT).await {
// Failing to detach shouldn't hold up deletion, e.g. if a node is offline we should be able
// to use some other node to run the remote deletion.
tracing::warn!("Failed to detach some locations: {e}");
}
// This reconcile wait can fail in a few ways:
// A there is a very long queue for the reconciler semaphore
// B some pageserver is failing to handle a detach promptly
// C some pageserver goes offline right at the moment we send it a request.
//
// A and C are transient: the semaphore will eventually become available, and once a node is marked offline
// the next attempt to reconcile will silently skip detaches for an offline node and succeed. If B happens,
// it's a bug, and needs resolving at the pageserver level (we shouldn't just leave attachments behind while
// deleting the underlying data).
self.await_waiters(detach_waiters, RECONCILE_TIMEOUT)
.await?;
let locations = shard_ids
.into_iter()
@@ -2431,13 +2469,11 @@ impl Service {
for result in results {
match result {
Ok(StatusCode::ACCEPTED) => {
// This could happen if we failed detach above, and hit a pageserver where the tenant
// is still attached: it will accept the deletion in the background
tracing::warn!(
"Unexpectedly still attached on {}, client should retry",
// This should never happen: we waited for detaches to finish above
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Unexpectedly still attached on {}",
node
);
return Ok(StatusCode::ACCEPTED);
)));
}
Ok(_) => {}
Err(mgmt_api::Error::Cancelled) => {
@@ -4312,6 +4348,16 @@ impl Service {
continue;
}
if !new_nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
@@ -4349,6 +4395,12 @@ impl Service {
// When a node comes back online, we must reconcile any tenant that has a None observed
// location on the node.
for tenant_shard in locked.tenants.values_mut() {
// If a reconciliation is already in progress, rely on the previous scheduling
// decision and skip triggering a new reconciliation.
if tenant_shard.reconciler.is_some() {
continue;
}
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(tenant_shard, &new_nodes);

View File

@@ -94,8 +94,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*WARN.*path=/v1/utilization .*request was dropped before completing",
# Can happen during shutdown
".*scheduling deletion on drop failed: queue is in state Stopped.*",
# Can happen during shutdown
".*ignoring failure to find gc cutoffs: timeline shutting down.*",
)

View File

@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS halfvec_test_table;
CREATE TABLE halfvec_test_table (
_id text NOT NULL,
title text,
text text,
embeddings halfvec(1536),
PRIMARY KEY (_id)
);
INSERT INTO halfvec_test_table (_id, title, text, embeddings)
SELECT _id, title, text, embeddings::halfvec
FROM documents;
CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);

View File

@@ -0,0 +1,13 @@
-- run with pooled connection
-- pgbench -T 300 -c 100 -j20 -f pgbench_halfvec_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
with x (x) as (
select "embeddings" as x
from halfvec_test_table
TABLESAMPLE SYSTEM (1)
LIMIT 1
)
SELECT title, "embeddings" <=> (select x from x) as distance
FROM halfvec_test_table
ORDER BY 2
LIMIT 30;

View File

@@ -1,13 +0,0 @@
-- run with pooled connection
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
with x (x) as (
select "embeddings" as x
from hnsw_test_table
TABLESAMPLE SYSTEM (1)
LIMIT 1
)
SELECT title, "embeddings" <=> (select x from x) as distance
FROM hnsw_test_table
ORDER BY 2
LIMIT 30;

View File

@@ -106,6 +106,7 @@ QUERIES: Tuple[LabelledQuery, ...] = (
# Disable auto formatting for the list of queries so that it's easier to read
# fmt: off
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
LabelledQuery("PGVPREP", r"ALTER EXTENSION VECTOR UPDATE;"),
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
@@ -115,6 +116,10 @@ PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
LabelledQuery("PGV9", r"DROP TABLE IF EXISTS halfvec_test_table;"),
LabelledQuery("PGV10", r"CREATE TABLE halfvec_test_table (_id text NOT NULL, title text, text text, embeddings halfvec(1536), PRIMARY KEY (_id));"),
LabelledQuery("PGV11", r"INSERT INTO halfvec_test_table (_id, title, text, embeddings) SELECT _id, title, text, embeddings::halfvec FROM documents;"),
LabelledQuery("PGV12", r"CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);"),
)
# fmt: on

View File

@@ -18,6 +18,7 @@ class PgBenchLoadType(enum.Enum):
SIMPLE_UPDATE = "simple-update"
SELECT_ONLY = "select-only"
PGVECTOR_HNSW = "pgvector-hnsw"
PGVECTOR_HALFVEC = "pgvector-halfvec"
def utc_now_timestamp() -> int:
@@ -153,6 +154,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
password=password,
)
if workload_type == PgBenchLoadType.PGVECTOR_HALFVEC:
# Run simple-update workload
run_pgbench(
env,
"pgvector-halfvec",
[
"pgbench",
"-f",
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql",
"-c100",
"-j20",
f"-T{duration}",
"-P2",
"--protocol=prepared",
"--progress-timestamp",
connstr,
],
password=password,
)
env.report_size()
@@ -222,13 +243,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
@pytest.mark.remote_cluster
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
# The following test runs on an existing database that has pgvector extension installed
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
#
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)

View File

@@ -0,0 +1,24 @@
import pytest
from fixtures.compare_fixtures import PgCompare
from performance.test_perf_pgbench import PgBenchLoadType, get_durations_matrix, run_test_pgbench
# The following test runs on an existing database that has pgvector extension installed
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
#
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_pgvector_hnsw(remote_compare: PgCompare, duration: int):
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
# The following test runs on an existing database that has pgvector extension installed
# and a table with 1 million embedding vectors loaded and indexed with halfvec.
#
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_pgvector_halfvec(remote_compare: PgCompare, duration: int):
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HALFVEC)

View File

@@ -300,7 +300,7 @@ def test_replica_query_race(neon_simple_env: NeonEnv):
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
time.sleep(1)
wait_replica_caughtup(primary_ep, standby_ep)
# In primary, run a lot of UPDATEs on a single page
finished = False

View File

@@ -129,3 +129,33 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
cur_replica = conn_replica.cursor()
cur_replica.execute("SELECT * FROM clogtest")
assert cur_replica.fetchall() == [(1,), (3,)]
def test_ondemand_download_after_wal_switch(neon_env_builder: NeonEnvBuilder):
"""
Test on-demand SLRU download on standby, when starting right after
WAL segment switch.
This is a repro for a bug in how the LSN at WAL page/segment
boundary was handled (https://github.com/neondatabase/neon/issues/8030)
"""
tenant_conf = {
"lazy_slru_download": "true",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Create a test table
cur.execute("CREATE TABLE clogtest (id integer)")
cur.execute("INSERT INTO clogtest VALUES (1)")
# Start standby at WAL segment boundary
cur.execute("SELECT pg_switch_wal()")
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
_endpoint_at_lsn = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
)

View File

@@ -133,6 +133,9 @@ def test_storage_controller_smoke(
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
# Let all the reconciliations after marking the node offline complete
env.storage_controller.reconcile_until_idle()
# Marking pageserver active should not migrate anything to it
# immediately
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Active"})
@@ -931,19 +934,27 @@ class Failure:
def clear(self, env: NeonEnv):
raise NotImplementedError()
def nodes(self):
raise NotImplementedError()
class NodeStop(Failure):
def __init__(self, pageserver_id, immediate):
self.pageserver_id = pageserver_id
def __init__(self, pageserver_ids, immediate):
self.pageserver_ids = pageserver_ids
self.immediate = immediate
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.stop(immediate=self.immediate)
for ps_id in self.pageserver_ids:
pageserver = env.get_pageserver(ps_id)
pageserver.stop(immediate=self.immediate)
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.start()
for ps_id in self.pageserver_ids:
pageserver = env.get_pageserver(ps_id)
pageserver.start()
def nodes(self):
return self.pageserver_ids
class PageserverFailpoint(Failure):
@@ -959,6 +970,9 @@ class PageserverFailpoint(Failure):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
def nodes(self):
return [self.pageserver_id]
def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
tenants = env.storage_controller.tenant_list()
@@ -982,8 +996,9 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
@pytest.mark.parametrize(
"failure",
[
NodeStop(pageserver_id=1, immediate=False),
NodeStop(pageserver_id=1, immediate=True),
NodeStop(pageserver_ids=[1], immediate=False),
NodeStop(pageserver_ids=[1], immediate=True),
NodeStop(pageserver_ids=[1, 2], immediate=True),
PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"),
],
)
@@ -1036,33 +1051,50 @@ def test_storage_controller_heartbeats(
wait_until(10, 1, tenants_placed)
# ... then we apply the failure
offline_node_id = failure.pageserver_id
online_node_id = (set(range(1, len(env.pageservers) + 1)) - {offline_node_id}).pop()
env.get_pageserver(offline_node_id).allowed_errors.append(
# In the case of the failpoint failure, the impacted pageserver
# still believes it has the tenant attached since location
# config calls into it will fail due to being marked offline.
".*Dropped remote consistent LSN updates.*",
)
offline_node_ids = set(failure.nodes())
online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids
for node_id in offline_node_ids:
env.get_pageserver(node_id).allowed_errors.append(
# In the case of the failpoint failure, the impacted pageserver
# still believes it has the tenant attached since location
# config calls into it will fail due to being marked offline.
".*Dropped remote consistent LSN updates.*",
)
if len(offline_node_ids) > 1:
env.get_pageserver(node_id).allowed_errors.append(
".*Scheduling error when marking pageserver.*offline.*",
)
failure.apply(env)
# ... expecting the heartbeats to mark it offline
def node_offline():
def nodes_offline():
nodes = env.storage_controller.node_list()
log.info(f"{nodes=}")
target = next(n for n in nodes if n["id"] == offline_node_id)
assert target["availability"] == "Offline"
for node in nodes:
if node["id"] in offline_node_ids:
assert node["availability"] == "Offline"
# A node is considered offline if the last successful heartbeat
# was more than 10 seconds ago (hardcoded in the storage controller).
wait_until(20, 1, node_offline)
wait_until(20, 1, nodes_offline)
# .. expecting the tenant on the offline node to be migrated
def tenant_migrated():
if len(online_node_ids) == 0:
time.sleep(5)
return
node_to_tenants = build_node_to_tenants_map(env)
log.info(f"{node_to_tenants=}")
assert set(node_to_tenants[online_node_id]) == set(tenant_ids)
observed_tenants = set()
for node_id in online_node_ids:
observed_tenants |= set(node_to_tenants[node_id])
assert observed_tenants == set(tenant_ids)
wait_until(10, 1, tenant_migrated)
@@ -1070,31 +1102,24 @@ def test_storage_controller_heartbeats(
failure.clear(env)
# ... expecting the offline node to become active again
def node_online():
def nodes_online():
nodes = env.storage_controller.node_list()
target = next(n for n in nodes if n["id"] == offline_node_id)
assert target["availability"] == "Active"
for node in nodes:
if node["id"] in online_node_ids:
assert node["availability"] == "Active"
wait_until(10, 1, node_online)
wait_until(10, 1, nodes_online)
time.sleep(5)
# ... then we create a new tenant
tid = TenantId.generate()
env.storage_controller.tenant_create(tid)
# ... expecting it to be placed on the node that just came back online
tenants = env.storage_controller.tenant_list()
newest_tenant = next(t for t in tenants if t["tenant_shard_id"] == str(tid))
locations = list(newest_tenant["observed"]["locations"].keys())
locations = [int(node_id) for node_id in locations]
assert locations == [offline_node_id]
node_to_tenants = build_node_to_tenants_map(env)
log.info(f"Back online: {node_to_tenants=}")
# ... expecting the storage controller to reach a consistent state
def storage_controller_consistent():
env.storage_controller.consistency_check()
wait_until(10, 1, storage_controller_consistent)
wait_until(30, 1, storage_controller_consistent)
def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):

View File

@@ -678,10 +678,6 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder):
with pytest.raises(PageserverApiException, match=matcher):
completion.result()
# this happens on both cases
env.pageserver.allowed_errors.append(
".*ignoring failure to find gc cutoffs: timeline shutting down.*"
)
# this happens only in the case of deletion (http response logging)
env.pageserver.allowed_errors.append(".*Failed to refresh gc_info before gathering inputs.*")

View File

@@ -287,13 +287,6 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
# already truncated away.
#
# ERROR: could not access status of transaction 1027
# Debugging https://github.com/neondatabase/neon/issues/6967
# the select() below fails occassionally at get_impl="vectored" validation
env.pageserver.http_client().patch_tenant_config_client_side(
tenant_id,
{"test_vm_bit_debug_logging": True},
)
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
tup = cur.fetchall()
log.info(f"tuple = {tup}")

View File

@@ -601,13 +601,16 @@ async def run_segment_init_failure(env: NeonEnv):
conn = await ep.connect_async()
ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary
# next insertion should hang until failpoint is disabled.
asyncio.create_task(conn.execute("insert into t select generate_series(1,1), 'payload'"))
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1,1), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# also restart ep at segment boundary to make test more interesting
ep.stop()
# it must still be not finished
# assert not bg_query.done()
assert not bg_query.done()
# Also restart ep at segment boundary to make test more interesting. Do it in immediate mode;
# fast will hang because it will try to gracefully finish sending WAL.
ep.stop(mode="immediate")
# Without segment rename during init (#6402) previous statement created
# partially initialized 16MB segment, so sk restart also triggers #6401.
sk.stop().start()

View File

@@ -18,7 +18,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter'
- name: sql-exporter
user: nobody
sysvInitAction: respawn
@@ -93,7 +93,7 @@ files:
target:
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
# the schema gets dropped or replaced to match the driver expected DSN format.
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter'
# Collectors (referenced by name) to execute on the target.
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
@@ -128,7 +128,7 @@ files:
target:
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
# the schema gets dropped or replaced to match the driver expected DSN format.
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling'
# Collectors (referenced by name) to execute on the target.
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
@@ -324,14 +324,15 @@ files:
help: 'Whether or not the replication slot wal_status is lost'
key_labels:
- slot_name
values: [wal_status_is_lost]
values: [wal_is_lost]
query: |
SELECT slot_name,
CASE
WHEN wal_status = 'lost' THEN 1
ELSE 0
END AS wal_status_is_lost
END AS wal_is_lost
FROM pg_replication_slots;
- filename: neon_collector_autoscaling.yml
content: |
collector_name: neon_collector_autoscaling