Compare commits

...

41 Commits

Author SHA1 Message Date
Alex Chi Z
bd348e50a4 fmt
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 13:16:23 -05:00
Alex Chi Z
d00f7f4f24 ensure image layers are created correctly
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 13:06:59 -05:00
Alex Chi Z
e1ec90207f remove from original keyspace
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
9b5767aba5 use scan instaed of get_vectored
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
9d28011e42 more comments
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
17b0b6da9f const fn
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
f2b4ce3224 fix scan not found <-> empty
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
61a4ff912a feat(pageserver): migrate reldir to sparse keyspace
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z.
e9ed53b14f feat(pageserver): support inherited sparse keyspace (#10313)
## Problem

In preparation to https://github.com/neondatabase/neon/issues/9516. We
need to store rel size and directory data in the sparse keyspace, but it
does not support inheritance yet.

## Summary of changes

Add a new type of keyspace "sparse but inherited" into the system.

On the read path: we don't remove the key range when we descend into the
ancestor. The search will stop when (1) the full key range is covered by
image layers (which has already been implemented before), or (2) we
reach the end of the ancestor chain.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 15:43:01 +00:00
Conrad Ludgate
a338aee132 feat(local_proxy): use ed25519 signatures with pg_session_jwt (#10290)
Generally ed25519 seems to be much preferred for cryptographic strength
to P256 nowadays, and it is NIST approved finally. We should use it
where we can as it's also faster than p256.

This PR makes the re-signed JWTs between local_proxy and pg_session_jwt
use ed25519.

This does introduce a new dependency on ed25519, but I do recall some
Neon Authorise customers asking for support for ed25519, so I am
justifying this dependency addition in the context that we can then
introduce support for customer ed25519 keys

sources:
* https://csrc.nist.gov/pubs/fips/186-5/final subsection 7 (EdDSA)
* https://datatracker.ietf.org/doc/html/rfc8037#section-3.1
2025-01-13 15:20:46 +00:00
Heikki Linnakangas
96243af651 Stop building unnecessary extension tarballs (#10355)
We build "custom extensions" from a different repository nowadays.
2025-01-13 15:01:13 +00:00
John Spray
ef8bfacd6b storage controller: API + CLI for migrating secondary locations (#10284)
## Problem

Currently, if we want to move a secondary there isn't a neat way to do
that: we just have migration API for the attached location, and it is
only clean to use that if you've manually created a secondary via
pageserver API in the place you're going to move it to.

Secondary migration API enables:
- Moving the secondary somewhere because we would like to later move the
attached location there.
- Move the secondary location because we just want to reclaim some disk
space from its current location.

## Summary of changes

- Add `/migrate_secondary` API
- Add `tenant-shard-migrate-secondary` CLI
- Add tests for above
2025-01-13 14:52:43 +00:00
Konstantin Knizhnik
ceacc29609 Start with minimal prefetch distance to minimize prefetch overhead for exact or limited index scans (#10359)
## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1736526089437179

In case of queries index scan with LIMIT clause, multiple backends can
concurrently send larger number of duplicated prefetch requests which
are not stored in LFC and so actually do useless job.

Current implementation of index prefetch starts with maximal prefetch
distance (10 by default now) when there are no key bounds, so in queries
with LIMIT clause like `select * from T order by pk limit 1` compute can
send a lot of useless prefetch requests to page server.

## Summary of changes

Always start with minimal prefetch distance even if there are not key
boundaries.

Related Postgres PRs:
https://github.com/neondatabase/postgres/pull/552
https://github.com/neondatabase/postgres/pull/551
https://github.com/neondatabase/postgres/pull/550
https://github.com/neondatabase/postgres/pull/549

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-01-13 14:26:11 +00:00
Erik Grinaker
b31ed0acd1 utils: add ?force=true hint for CPU profiler (#10368)
This makes it less annoying to try to take a CPU profile when a
continuous profile is already running.
2025-01-13 14:23:42 +00:00
Alexander Bayandin
b2d0e1a519 Link OpenSSL dynamically (#10302)
## Problem
Statically linked OpenSSL is buggy in multithreaded environment:
- https://github.com/neondatabase/cloud/issues/16155
- https://github.com/neondatabase/neon/issues/8275

## Summary of changes
- Link OpenSSL dynamically (revert OpenSSL part from
https://github.com/neondatabase/neon/pull/8074)

Before:
```
ldd /usr/local/v17/lib/libpq.so
        linux-vdso.so.1 (0x0000ffffb5ce4000)
        libm.so.6 => /lib/aarch64-linux-gnu/libm.so.6 (0x0000ffffb5c10000)
        libc.so.6 => /lib/aarch64-linux-gnu/libc.so.6 (0x0000ffffb5650000)
        /lib/ld-linux-aarch64.so.1 (0x0000ffffb5ca7000)
```

After:
```
ldd /usr/local/v17/lib/libpq.so
        linux-vdso.so.1 (0x0000ffffbf3e8000)
        libssl.so.3 => /lib/aarch64-linux-gnu/libssl.so.3 (0x0000ffffbf260000)
        libcrypto.so.3 => /lib/aarch64-linux-gnu/libcrypto.so.3 (0x0000ffffbec00000)
        libm.so.6 => /lib/aarch64-linux-gnu/libm.so.6 (0x0000ffffbf1c0000)
        libc.so.6 => /lib/aarch64-linux-gnu/libc.so.6 (0x0000ffffbea50000)
        /lib/ld-linux-aarch64.so.1 (0x0000ffffbf3ab000)
```
2025-01-13 14:13:02 +00:00
John Spray
d1bc36f536 storage controller: fix retries of compute hook notifications while a secondary node is offline (#10352)
## Problem

We would sometimes fail to retry compute notifications:
1. Try and send, set compute_notify_failure if we can't
2. On next reconcile, reconcile() fails for some other reason (e.g.
tried to talk to an offline node), and we fail the `result.is_ok() &&
must_notify` condition around the re-sending.

Closes: https://github.com/neondatabase/cloud/issues/22612

## Summary of changes

- Clarify the meaning of the reconcile result: it should be Ok(()) if
configuring attached location worked, even if secondary or detach
locations cannot be reached.
- Skip trying to talk to secondaries if they're offline
- Even if reconcile fails and we can't send the compute notification (we
can't send it because we're not sure if it's really attached), make sure
we save the `compute_notify_failure` flag so that subsequent reconciler
runs will try again
- Add a regression test for the above
2025-01-13 13:31:57 +00:00
Erik Grinaker
0b9032065e utils: allow 60-second CPU profiles (#10367)
Taking continuous profiles every 20 seconds is likely too expensive (in
dollar terms). Let's try 60-second profiles. We can now interrupt
running profiles via `?force=true`, so this should be fine.
2025-01-13 13:14:23 +00:00
Heikki Linnakangas
09fe3b025c Add a websockets tunnel and a test for the proxy's websockets support. (#3823)
For testing the proxy's websockets support.

I wrote this to test https://github.com/neondatabase/neon/issues/3822.
Unfortunately, that bug can *not* be reproduced with this tunnel. The
bug only appears when the client pipelines the first query with the
authentication messages. The tunnel doesn't do that.

---

Update (@conradludgate 2025-01-10):

We have since added some websocket tests, but they manually implemented
a very simplistic setup of the postgres protocol. Introducing the tunnel
would make more complex testing simpler in the future.

---------

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>
2025-01-13 11:35:39 +00:00
John Spray
12053cf832 storage controller: improve consistency_check_api (#10363)
## Problem

Limitations found while using this to investigate
https://github.com/neondatabase/neon/issues/10234:
- If we hit a node consistency issue, we drop out and don't check shards
for consistency
- The messages printed after a shard consistency issue are huge, and
grafana appears to drop them.

## Summary of changes

- Defer node consistency errors until the end of the function, so that
we always proceed to check shards for consistency
- Print out smaller log lines that just point out the diffs between
expected and persistent state
2025-01-13 11:18:14 +00:00
Conrad Ludgate
de199d71e1 chore: Address lints introduced in rust 1.85.0 beta (#10340)
With a new beta build of the rust compiler, it's good to check out the
new lints. Either to find false positives, or find flaws in our code.
Additionally, it helps reduce the effort required to update to 1.85 in 6
weeks.
2025-01-13 10:34:36 +00:00
Erik Grinaker
22a6460010 libs/utils: add force parameter for /profile/cpu (#10361)
## Problem

It's only possible to take one CPU profile at a time. With Grafana
continuous profiling, a (low-frequency) CPU profile will always be
running, making it hard to take an ad hoc CPU profile at the same time.

Resolves #10072.

## Summary of changes

Add a `force` parameter for `/profile/cpu` which will end and return an
already running CPU profile, starting a new one for the current caller.
2025-01-13 10:01:18 +00:00
Erik Grinaker
cd982a82ec pageserver,safekeeper: increase heap profiling frequency to 2 MB (#10362)
## Problem

Currently, the heap profiling frequency is every 1 MB allocated. Taking
a profile stack trace takes about 1 µs, and allocating 1 MB takes about
15 µs, so the overhead is about 6.7% which is a bit high. This is a
fixed cost regardless of whether heap profiles are actually accessed.

## Summary of changes

Increase the heap profiling sample frequency from 1 MB to 2 MB, which
reduces the overhead to about 3.3%. This seems acceptable, considering
performance-sensitive code will avoid allocations as far as possible
anyway.
2025-01-13 09:44:59 +00:00
Heikki Linnakangas
8327f68043 Minor cleanup of extension build commands (#10356)
There used to be some pg version dependencies in these extensions, but
now that there isn't, follow the simpler pattern used in other
extensions. No change in the produced images.
2025-01-11 17:39:27 +00:00
Heikki Linnakangas
846e8fdce4 Remove obsolete hnsw extension (#8008)
This has been deprecated and disabled for new installations for a long
time. Let's remove it for good.
2025-01-11 14:20:50 +00:00
Heikki Linnakangas
70a3bf37a0 Stop building 'compute-tools' image (#10333)
It's been unused from time immemorial.

---------

Co-authored-by: Matthias van de Meent <matthias@neon.tech>
2025-01-11 13:09:55 +00:00
Arpad Müller
23c0748cdd Remove active column (#10335)
We don't need or want the `active` column. Remove it. Vlad pointed out
that this is safe.

Thanks to the separation of the schemata in earlier PRs, this is easy.

follow-up of #10205

Part of https://github.com/neondatabase/neon/issues/9981
2025-01-11 02:52:45 +00:00
Alex Chi Z.
b5d54ba52a refactor(pageserver): move queue logic to compaction.rs (#10330)
## Problem

close https://github.com/neondatabase/neon/issues/10031, part of
https://github.com/neondatabase/neon/issues/9114

## Summary of changes

Move the compaction job generation to `compaction.rs`, thus making the
code more readable and debuggable. We now also return running job
through the get compaction job API, versus before we only return
scheduled jobs.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-10 20:53:00 +00:00
Christian Schwarz
58332cb361 pageserver: remove unused metric pageserver_layers_visited_per_read_global (#10141)
As of commit "pageserver: remove legacy read path" (#8601) we always use
vectored get, which has a separate metric.
2025-01-10 20:35:50 +00:00
Christian Schwarz
9b43204893 fix(page_service): Timeline::gate held open while throttling (#10314)
When we moved throttling up from Timeline::get into page_service,
we stopped being sensitive to `Timeline::cancel`, even though we're
holding a Handle and thus a guard on the `Timeline::gate` open.

This PR rectifies the situation.

Refs

- Found while investigating #10309 (hung detach because gate kept open),
  but not expected to be the root cause of that issue because the
  affected tenants are not being throttled according to their metrics.
2025-01-10 19:21:01 +00:00
Christian Schwarz
cdd34dfc12 impr(utils/spsc_fold): add another test case (#10319)
Wondered about the case covered here while investigating #10309.
2025-01-10 19:19:48 +00:00
Vlad Lazar
3cd5034eac storcon: don't assume host:port format in storcon client (#10347)
## Problem

https://github.com/neondatabase/infra/pull/2725 updated the scrubber to
use a non-host
port endpoint for storcon. That breaks when unwrapping the port.

## Summary of changes

Support both `host:port` and `host` formats for the storcon api.
2025-01-10 18:35:16 +00:00
Cheng Chen
425b777840 chore(compute): pg_mooncake v0.1.0 (#10337)
## Problem
Upgrade pg_mooncake to v0.1.0

## Summary of changes
2025-01-10 16:38:13 +00:00
John Spray
4398051385 tests: smaller datasets in LFC tests (#10346)
## Problem

These two tests came up in #9537 as doing multi-gigabyte I/O, and from
inspection of the tests it doesn't seem like they need that to fulfil
their purpose.

## Summary of changes

- In test_local_file_cache_unlink, run fewer background threads with a
smaller number of rows. These background threads AFAICT exist to make
sure some I/O is going on while we unlink the LFC directory, but 5
threads should be enough for "some".
- In test_lfc_resize, tweak the test to validate that the cache size is
larger than the final size before resizing it, so that we're sure we're
writing enough data to really be doing something. Then decrease the
pgbench scale.
2025-01-10 15:53:23 +00:00
Folke Behrens
71bca6f580 poetry: Update packaging for poetry v2 (#10344)
## Problem

When poetry v2 (released Jan 5) is used it needs `packaging.metadata`
module, but we downgrade `packaging` to 23.0. `packaging==23.1`
introduced the metadata submodule.

## Summary of changes

Update `packaging` to 24.2.
2025-01-10 14:32:26 +00:00
John Spray
105f66c4ce tests: move test_parallel_copy into performance tree (#10343)
## Problem

This test writes ~5GB of data. It is not suitable to run in parallel
with all the other small tests in test_runner/regress.

via #9537 

## Summary of changes

- Move test_parallel_copy into the performance directory, so that it
does not run in parallel with other tests
2025-01-10 13:57:26 +00:00
John Spray
0d4fce2d35 tests: refine how compat snapshot is generated (#10342)
## Problem

I noticed in https://github.com/neondatabase/neon/pull/9537 that tests
which work with compat snapshots were writing several hundred MB of
data, which isn't really necessary.

Also, the snapshots are large but don't have the proper variety of
storage format features, e.g. they could just have L0 deltas.

## Summary of changes

- Use smaller scale factor and runtime to generate less data
- Configure a small layer size and use force image layer generation so
that our output contains L1 deltas and image layers, and has a decent
number of entries in the layer map
2025-01-10 13:57:23 +00:00
Erik Grinaker
2b8ea1e768 utils: add flamegraph for heap profiles (#10223)
## Problem

Unlike CPU profiles, the `/profile/heap` endpoint can't automatically
generate SVG flamegraphs. This requires the user to install and use
`pprof` tooling, which is unnecessary and annoying.

Resolves #10203.

## Summary of changes

Add `format=svg` for the `/profile/heap` route, and generate an SVG
flamegraph using the `inferno` crate, similarly to what `pprof-rs`
already does for CPU profiles.
2025-01-10 12:14:29 +00:00
Christian Schwarz
db00eb41a1 fix(spsc_fold): potentially missing wakeup when send()ing in state SenderWaitsForReceiverToConsume (#10318)
# Problem

Before this PR, there were cases where send() in state
SenderWaitsForReceiverToConsume would never be woken up
by the receiver, because it never registered with `wake_sender`.

Example Scenario 1: we stop polling a send() future A that was waiting
for the receiver to consume. We drop A and create a new send() future B.
B would return Poll::Pending and never regsister a waker.

Example Scenario 2: a send() future A transitions from HasData
to SenderWaitsForReceiverToConsume. This registers the context X
with `wake_sender`. But before the Receiver consumes the data,
we poll A from a different context Y.
The state is still SenderWaitsForReceiverToConsume, but we wouldn't
register the new context with `wake_sender`.
When the Receiver comes around to consume and `wake_sender.notify()`s,
it wakes the old context X instead of Y.

# Fix

Register the waker in the case where we're polled in
state `SenderWaitsForReceiverToConsume`.

# Relation to #10309

I found this bug while investigating #10309.
There was never proof that this bug here is the root cause for #10309.
In the meantime we found a more probably hypothesis
for the root cause than what is being fixed here.
Regardless, let's walk through my thought process about
how it might have been relevant:

There (in page_service), Scenario 1 does not apply because
we poll the send() future to completion.

Scenario 2 (`tokio::join!`) also does not apply with the
current `tokio::join!()` impl, because it will just poll each
future every time, each with the same context.
Although if we ever used something like a FuturesUnordered anywhere,
that will be using a different context, so, in that case,
the bug might materialize.

Regarding tokio & spurious poll in general:
@conradludgate is not aware of any spurious wakeup cases in current
tokio,
but within a `tokio::join!()`, any wake meant for one future will poll
all
the futures, so that can appear as a spurious wake up to the N-1 futures
of the `tokio::join!()`.
2025-01-10 11:06:03 +00:00
Conrad Ludgate
735c66dc65 fix(proxy): propagate the existing ComputeUserInfo to connect for cancellation (#10322)
## Problem

We were incorrectly constructing the ComputeUserInfo, used for
cancellation checks, based on the return parameters from postgres. This
didn't contain the correct info.

## Summary of changes

Propagate down the existing ComputeUserInfo.
2025-01-10 09:36:51 +00:00
Folke Behrens
77660f3d88 proxy: Fix parsing of UnknownTopic with payload (#10339)
## Problem

When the proxy receives a `Notification` with an unknown topic it's
supposed to use the `UnknownTopic` unit variant. Unfortunately, in
adjacently tagged enums serde will not simply ignore the configured
content if found and try to deserialize a map/object instead.

## Summary of changes

* Use a custom deserialize function to ignore variant content.
* Add a little unit test covering both cases.
2025-01-10 09:12:31 +00:00
Folke Behrens
b6205af4a5 Update tracing/otel crates (#10311)
Update the tracing(-x) and opentelemetry(-x) crates.

Some breaking changes require updating our code:
* Initialization is done via builders now

https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-otlp/CHANGELOG.md#0270
* Errors from OTel SDK are logged via tracing crate as well.

https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry/CHANGELOG.md#0270
2025-01-10 08:48:03 +00:00
85 changed files with 1915 additions and 1862 deletions

View File

@@ -728,30 +728,6 @@ jobs:
tags: |
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
- name: Build compute-tools image
# compute-tools are Postgres independent, so build it only once
# We pick 16, because that builds on debian 11 with older glibc (and is
# thus compatible with newer glibc), rather than 17 on Debian 12, as
# that isn't guaranteed to be compatible with Debian 11
if: matrix.version.pg == 'v16'
uses: docker/build-push-action@v6
with:
target: compute-tools-image
context: .
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
push: true
pull: true
file: compute/compute-node.Dockerfile
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-tools-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
tags: |
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
compute-node-image:
needs: [ compute-node-image-arch, tag ]
permissions:
@@ -794,14 +770,6 @@ jobs:
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
- name: Create multi-arch compute-tools image
if: matrix.version.pg == 'v16'
run: |
docker buildx imagetools create -t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }} \
-t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
@@ -817,12 +785,6 @@ jobs:
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
- name: Push multi-arch compute-tools image to ECR
if: matrix.version.pg == 'v16'
run: |
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{ needs.tag.outputs.build-tag }} \
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}
vm-compute-node-image:
needs: [ check-permissions, tag, compute-node-image ]
runs-on: [ self-hosted, large ]
@@ -1001,9 +963,6 @@ jobs:
docker buildx imagetools create -t $repo/neon:latest \
$repo/neon:${{ needs.tag.outputs.build-tag }}
docker buildx imagetools create -t $repo/compute-tools:latest \
$repo/compute-tools:${{ needs.tag.outputs.build-tag }}
for version in ${VERSIONS}; do
docker buildx imagetools create -t $repo/compute-node-${version}:latest \
$repo/compute-node-${version}:${{ needs.tag.outputs.build-tag }}
@@ -1032,7 +991,7 @@ jobs:
- name: Copy all images to prod ECR
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
run: |
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16,v17}; do
for image in neon {vm-,}compute-node-{v14,v15,v16,v17}; do
docker buildx imagetools create -t 093970136003.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }} \
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
done
@@ -1044,7 +1003,7 @@ jobs:
with:
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
images: neon vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}
@@ -1056,7 +1015,7 @@ jobs:
with:
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
image_tag: ${{ needs.tag.outputs.build-tag }}
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
images: neon vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
tenant_id: ${{ vars.AZURE_TENANT_ID }}

168
Cargo.lock generated
View File

@@ -1605,6 +1605,32 @@ dependencies = [
"typenum",
]
[[package]]
name = "curve25519-dalek"
version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
dependencies = [
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"digest",
"fiat-crypto",
"rustc_version",
"subtle",
]
[[package]]
name = "curve25519-dalek-derive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "darling"
version = "0.20.1"
@@ -1653,6 +1679,20 @@ dependencies = [
"parking_lot_core 0.9.8",
]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.8",
]
[[package]]
name = "data-encoding"
version = "2.4.0"
@@ -1861,6 +1901,28 @@ dependencies = [
"spki 0.7.3",
]
[[package]]
name = "ed25519"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53"
dependencies = [
"signature 2.2.0",
]
[[package]]
name = "ed25519-dalek"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871"
dependencies = [
"curve25519-dalek",
"ed25519",
"rand_core 0.6.4",
"sha2",
"subtle",
]
[[package]]
name = "either"
version = "1.8.1"
@@ -1952,6 +2014,15 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "env_filter"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log",
]
[[package]]
name = "env_logger"
version = "0.10.2"
@@ -1965,6 +2036,16 @@ dependencies = [
"termcolor",
]
[[package]]
name = "env_logger"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d"
dependencies = [
"env_filter",
"log",
]
[[package]]
name = "equator"
version = "0.2.2"
@@ -2080,6 +2161,12 @@ dependencies = [
"subtle",
]
[[package]]
name = "fiat-crypto"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d"
[[package]]
name = "filetime"
version = "0.2.22"
@@ -2948,6 +3035,28 @@ dependencies = [
"str_stack",
]
[[package]]
name = "inferno"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75a5d75fee4d36809e6b021e4b96b686e763d365ffdb03af2bd00786353f84fe"
dependencies = [
"ahash",
"clap",
"crossbeam-channel",
"crossbeam-utils",
"dashmap 6.1.0",
"env_logger 0.11.2",
"indexmap 2.0.1",
"itoa",
"log",
"num-format",
"once_cell",
"quick-xml 0.37.1",
"rgb",
"str_stack",
]
[[package]]
name = "inotify"
version = "0.9.6"
@@ -3155,7 +3264,7 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4644821e1c3d7a560fe13d842d13f587c07348a1a05d3a797152d41c90c56df2"
dependencies = [
"dashmap",
"dashmap 5.5.0",
"hashbrown 0.13.2",
]
@@ -3693,23 +3802,23 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "opentelemetry"
version = "0.26.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17"
checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7"
dependencies = [
"futures-core",
"futures-sink",
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror",
"tracing",
]
[[package]]
name = "opentelemetry-http"
version = "0.26.0"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6351496aeaa49d7c267fb480678d85d1cd30c5edb20b497c48c56f62a8c14b99"
checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80"
dependencies = [
"async-trait",
"bytes",
@@ -3720,9 +3829,9 @@ dependencies = [
[[package]]
name = "opentelemetry-otlp"
version = "0.26.0"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd"
checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76"
dependencies = [
"async-trait",
"futures-core",
@@ -3738,9 +3847,9 @@ dependencies = [
[[package]]
name = "opentelemetry-proto"
version = "0.26.1"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34"
checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
@@ -3750,22 +3859,21 @@ dependencies = [
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.26.0"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db945c1eaea8ac6a9677185357480d215bb6999faa9f691d0c4d4d641eab7a09"
checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52"
[[package]]
name = "opentelemetry_sdk"
version = "0.26.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3"
checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8"
dependencies = [
"async-trait",
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"once_cell",
"opentelemetry",
"percent-encoding",
"rand 0.8.5",
@@ -3773,6 +3881,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
@@ -4421,7 +4530,7 @@ dependencies = [
"bytes",
"crc32c",
"criterion",
"env_logger",
"env_logger 0.10.2",
"log",
"memoffset 0.9.0",
"once_cell",
@@ -4462,7 +4571,7 @@ dependencies = [
"cfg-if",
"criterion",
"findshlibs",
"inferno",
"inferno 0.11.21",
"libc",
"log",
"nix 0.26.4",
@@ -4688,9 +4797,10 @@ dependencies = [
"clap",
"compute_api",
"consumption_metrics",
"dashmap",
"dashmap 5.5.0",
"ecdsa 0.16.9",
"env_logger",
"ed25519-dalek",
"env_logger 0.10.2",
"fallible-iterator",
"flate2",
"framed-websockets",
@@ -4797,6 +4907,15 @@ dependencies = [
"serde",
]
[[package]]
name = "quick-xml"
version = "0.37.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f22f29bdff3987b4d8632ef95fd6424ec7e4e0a57e2f4fc63e489e75357f6a03"
dependencies = [
"memchr",
]
[[package]]
name = "quote"
version = "1.0.37"
@@ -5181,9 +5300,9 @@ dependencies = [
[[package]]
name = "reqwest-tracing"
version = "0.5.4"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff82cf5730a1311fb9413b0bc2b8e743e0157cd73f010ab4ec374a923873b6a2"
checksum = "73e6153390585f6961341b50e5a1931d6be6dee4292283635903c26ef9d980d2"
dependencies = [
"anyhow",
"async-trait",
@@ -7048,9 +7167,9 @@ dependencies = [
[[package]]
name = "tracing-opentelemetry"
version = "0.27.0"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b"
checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053"
dependencies = [
"js-sys",
"once_cell",
@@ -7319,6 +7438,7 @@ dependencies = [
"hex-literal",
"humantime",
"hyper 0.14.30",
"inferno 0.12.0",
"itertools 0.10.5",
"jemalloc_pprof",
"jsonwebtoken",
@@ -7422,7 +7542,7 @@ dependencies = [
"anyhow",
"camino-tempfile",
"clap",
"env_logger",
"env_logger 0.10.2",
"log",
"postgres",
"postgres_ffi",

View File

@@ -110,6 +110,7 @@ hyper-util = "0.1"
tokio-tungstenite = "0.21.0"
indexmap = "2"
indoc = "2"
inferno = "0.12.0"
ipnet = "2.10.0"
itertools = "0.10"
itoa = "1.0.11"
@@ -126,10 +127,10 @@ notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.15"
once_cell = "1.13"
opentelemetry = "0.26"
opentelemetry_sdk = "0.26"
opentelemetry-otlp = { version = "0.26", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.26"
opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.27"
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
@@ -143,7 +144,7 @@ rand = "0.8"
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_26"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
reqwest-middleware = "0.4"
reqwest-retry = "0.7"
routerify = "3"
@@ -192,7 +193,7 @@ tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-opentelemetry = "0.27"
tracing-opentelemetry = "0.28"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
twox-hash = { version = "1.6.3", default-features = false }

View File

@@ -71,6 +71,7 @@ RUN set -e \
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
openssl \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \
&& chown -R neon:neon /data

View File

@@ -3,7 +3,6 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
OPENSSL_PREFIX_DIR := /usr/local/openssl
ICU_PREFIX_DIR := /usr/local/icu
#
@@ -26,11 +25,9 @@ endif
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
# Exclude static build openssl, icu for local build (MacOS, Linux)
# Only keep for build type release and debug
PG_CFLAGS += -I$(OPENSSL_PREFIX_DIR)/include
PG_CONFIGURE_OPTS += --with-icu
PG_CONFIGURE_OPTS += ICU_CFLAGS='-I/$(ICU_PREFIX_DIR)/include -DU_STATIC_IMPLEMENTATION'
PG_CONFIGURE_OPTS += ICU_LIBS='-L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -licui18n -licuuc -licudata -lstdc++ -Wl,-Bdynamic -lm'
PG_CONFIGURE_OPTS += LDFLAGS='-L$(OPENSSL_PREFIX_DIR)/lib -L$(OPENSSL_PREFIX_DIR)/lib64 -L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -Wl,-Bstatic -lssl -lcrypto -Wl,-Bdynamic -lrt -lm -ldl -lpthread'
endif
UNAME_S := $(shell uname -s)

View File

@@ -190,21 +190,6 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
&& make install \
&& rm -rf ../lcov.tar.gz
# Compile and install the static OpenSSL library
ENV OPENSSL_VERSION=1.1.1w
ENV OPENSSL_PREFIX=/usr/local/openssl
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
echo "cf3098950cb4d853ad95c0841f1f9c6d3dc102dccfcacd521d93925208b76ac8 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
cd /tmp && \
tar xzvf /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
rm /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
cd /tmp/openssl-${OPENSSL_VERSION} && \
./config --prefix=${OPENSSL_PREFIX} -static --static no-shared -fPIC && \
make -j "$(nproc)" && \
make install && \
cd /tmp && \
rm -rf /tmp/openssl-${OPENSSL_VERSION}
# Use the same version of libicu as the compute nodes so that
# clusters created using inidb on pageserver can be used by computes.
#

View File

@@ -170,7 +170,6 @@ RUN case "${PG_VERSION}" in \
wget https://download.osgeo.org/postgis/source/postgis-${POSTGIS_VERSION}.tar.gz -O postgis.tar.gz && \
echo "${POSTGIS_CHECKSUM} postgis.tar.gz" | sha256sum --check && \
mkdir postgis-src && cd postgis-src && tar xzf ../postgis.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
./autogen.sh && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -220,11 +219,7 @@ RUN case "${PG_VERSION}" in \
cmake -GNinja -DCMAKE_BUILD_TYPE=Release .. && \
ninja -j $(getconf _NPROCESSORS_ONLN) && \
ninja -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\
cp /usr/local/pgsql/share/extension/pgrouting.control /extensions/postgis && \
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/postgis.tar.zst -T -
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control
#########################################################################################
#
@@ -842,13 +837,8 @@ RUN case "${PG_VERSION}" in "v17") \
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\
mkdir -p /extensions/anon && cp /usr/local/pgsql/share/extension/anon.control /extensions/anon && \
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/anon.tar.zst -T -
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control
#########################################################################################
#
@@ -976,22 +966,9 @@ RUN apt update && apt install --no-install-recommends --no-install-suggests -y p
FROM rust-extensions-build-pgrx12 AS pg-jsonschema-pg-build
ARG PG_VERSION
# version 0.3.3 supports v17
# last release v0.3.3 - Oct 16, 2024
#
# there were no breaking changes
# so we can use the same version for all postgres versions
RUN case "${PG_VERSION}" in \
"v14" | "v15" | "v16" | "v17") \
export PG_JSONSCHEMA_VERSION=0.3.3 \
export PG_JSONSCHEMA_CHECKSUM=40c2cffab4187e0233cb8c3bde013be92218c282f95f4469c5282f6b30d64eac \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v${PG_JSONSCHEMA_VERSION}.tar.gz -O pg_jsonschema.tar.gz && \
echo "${PG_JSONSCHEMA_CHECKSUM} pg_jsonschema.tar.gz" | sha256sum --check && \
RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.3.tar.gz -O pg_jsonschema.tar.gz && \
echo "40c2cffab4187e0233cb8c3bde013be92218c282f95f4469c5282f6b30d64eac pg_jsonschema.tar.gz" | sha256sum --check && \
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
# see commit 252b3685a27a0f4c31a0f91e983c6314838e89e8
# `unsafe-postgres` feature allows to build pgx extensions
@@ -1012,22 +989,9 @@ RUN case "${PG_VERSION}" in \
FROM rust-extensions-build-pgrx12 AS pg-graphql-pg-build
ARG PG_VERSION
# version 1.5.9 supports v17
# last release v1.5.9 - Oct 16, 2024
#
# there were no breaking changes
# so we can use the same version for all postgres versions
RUN case "${PG_VERSION}" in \
"v14" | "v15" | "v16" | "v17") \
export PG_GRAPHQL_VERSION=1.5.9 \
export PG_GRAPHQL_CHECKSUM=cf768385a41278be1333472204fc0328118644ae443182cf52f7b9b23277e497 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v${PG_GRAPHQL_VERSION}.tar.gz -O pg_graphql.tar.gz && \
echo "${PG_GRAPHQL_CHECKSUM} pg_graphql.tar.gz" | sha256sum --check && \
RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.9.tar.gz -O pg_graphql.tar.gz && \
echo "cf768385a41278be1333472204fc0328118644ae443182cf52f7b9b23277e497 pg_graphql.tar.gz" | sha256sum --check && \
mkdir pg_graphql-src && cd pg_graphql-src && tar xzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "=0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
@@ -1091,8 +1055,8 @@ ARG PG_VERSION
# NOTE: local_proxy depends on the version of pg_session_jwt
# Do not update without approve from proxy team
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.1.2-v17.tar.gz -O pg_session_jwt.tar.gz && \
echo "c8ecbed9cb8c6441bce5134a176002b043018adf9d05a08e457dda233090a86e pg_session_jwt.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.2.0.tar.gz -O pg_session_jwt.tar.gz && \
echo "5ace028e591f2e000ca10afa5b1ca62203ebff014c2907c0ec3b29c36f28a1bb pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "=0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release
@@ -1167,22 +1131,13 @@ FROM rust-extensions-build AS pg-mooncake-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# The topmost commit in the `neon` branch at the time of writing this
# https://github.com/Mooncake-Labs/pg_mooncake/commits/neon/
# https://github.com/Mooncake-Labs/pg_mooncake/commit/077c92c452bb6896a7b7776ee95f039984f076af
ENV PG_MOONCAKE_VERSION=077c92c452bb6896a7b7776ee95f039984f076af
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in \
'v14') \
echo "pg_mooncake is not supported on Postgres ${PG_VERSION}" && exit 0;; \
esac && \
git clone --depth 1 --branch neon https://github.com/Mooncake-Labs/pg_mooncake.git pg_mooncake-src && \
cd pg_mooncake-src && \
git checkout "${PG_MOONCAKE_VERSION}" && \
git submodule update --init --depth 1 --recursive && \
make BUILD_TYPE=release -j $(getconf _NPROCESSORS_ONLN) && \
make BUILD_TYPE=release -j $(getconf _NPROCESSORS_ONLN) install && \
RUN wget https://github.com/Mooncake-Labs/pg_mooncake/releases/download/v0.1.0/pg_mooncake-0.1.0.tar.gz -O pg_mooncake.tar.gz && \
echo "eafd059b77f541f11525eb8affcd66a176968cbd8fe7c0d436e733f2aa4da59f pg_mooncake.tar.gz" | sha256sum --check && \
mkdir pg_mooncake-src && cd pg_mooncake-src && tar xzf ../pg_mooncake.tar.gz --strip-components=1 -C . && \
make release -j $(getconf _NPROCESSORS_ONLN) && \
make install -j $(getconf _NPROCESSORS_ONLN) && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_mooncake.control
#########################################################################################
@@ -1267,20 +1222,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_rmgr \
-s install && \
case "${PG_VERSION}" in \
"v14" | "v15") \
;; \
"v16" | "v17") \
echo "Skipping HNSW for PostgreSQL ${PG_VERSION}" && exit 0 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/hnsw \
-s install
#########################################################################################
@@ -1297,17 +1238,6 @@ USER nonroot
COPY --chown=nonroot . .
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy
#########################################################################################
#
# Final compute-tools image
#
#########################################################################################
FROM debian:$DEBIAN_FLAVOR AS compute-tools-image
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import
#########################################################################################
#
# Layer "pgbouncer"

View File

@@ -111,11 +111,6 @@ fn main() -> Result<()> {
fn init() -> Result<(String, clap::ArgMatches)> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
opentelemetry::global::set_error_handler(|err| {
tracing::info!("OpenTelemetry error: {err}");
})
.expect("global error handler lock poisoned");
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
for sig in signals.forever() {

View File

@@ -17,7 +17,7 @@
//!
//! # Local Testing
//!
//! - Comment out most of the pgxns in The Dockerfile.compute-tools to speed up the build.
//! - Comment out most of the pgxns in compute-node.Dockerfile to speed up the build.
//! - Build the image with the following command:
//!
//! ```bash

View File

@@ -483,7 +483,6 @@ impl LocalEnv {
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
.map(|&(_, timeline_id)| timeline_id)
.map(TimelineId::from)
}
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {

View File

@@ -822,10 +822,7 @@ impl StorageController {
self.dispatch(
Method::PUT,
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
Some(TenantShardMigrateRequest {
tenant_shard_id,
node_id,
}),
Some(TenantShardMigrateRequest { node_id }),
)
.await
}

View File

@@ -112,6 +112,13 @@ enum Command {
#[arg(long)]
node: NodeId,
},
/// Migrate the secondary location for a tenant shard to a specific pageserver.
TenantShardMigrateSecondary {
#[arg(long)]
tenant_shard_id: TenantShardId,
#[arg(long)]
node: NodeId,
},
/// Cancel any ongoing reconciliation for this shard
TenantShardCancelReconcile {
#[arg(long)]
@@ -540,10 +547,7 @@ async fn main() -> anyhow::Result<()> {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest {
tenant_shard_id,
node_id: node,
};
let req = TenantShardMigrateRequest { node_id: node };
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
@@ -553,6 +557,20 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::TenantShardMigrateSecondary {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest { node_id: node };
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{tenant_shard_id}/migrate_secondary"),
Some(req),
)
.await?;
}
Command::TenantShardCancelReconcile { tenant_shard_id } => {
storcon_client
.dispatch::<(), ()>(
@@ -915,10 +933,7 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
Some(TenantShardMigrateRequest {
tenant_shard_id: mv.tenant_shard_id,
node_id: mv.to,
}),
Some(TenantShardMigrateRequest { node_id: mv.to }),
)
.await
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))

View File

@@ -7,15 +7,11 @@ Currently we build two main images:
- [neondatabase/neon](https://hub.docker.com/repository/docker/neondatabase/neon) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
- [neondatabase/compute-node-v16](https://hub.docker.com/repository/docker/neondatabase/compute-node-v16) — compute node image with pre-built Postgres binaries from [neondatabase/postgres](https://github.com/neondatabase/postgres). Similar images exist for v15 and v14. Built from [/compute-node/Dockerfile](/compute/compute-node.Dockerfile).
And additional intermediate image:
- [neondatabase/compute-tools](https://hub.docker.com/repository/docker/neondatabase/compute-tools) — compute node configuration management tools.
## Build pipeline
We build all images after a successful `release` tests run and push automatically to Docker Hub with two parallel CI jobs
1. `neondatabase/compute-tools` and `neondatabase/compute-node-v16` (and -v15 and -v14)
1. `neondatabase/compute-node-v17` (and -16, -v15, -v14)
2. `neondatabase/neon`

View File

@@ -179,7 +179,6 @@ pub struct TenantDescribeResponseShard {
/// specifies some constraints, e.g. asking it to get off particular node(s)
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantShardMigrateRequest {
pub tenant_shard_id: TenantShardId,
pub node_id: NodeId,
}

View File

@@ -5,6 +5,7 @@ use postgres_ffi::Oid;
use postgres_ffi::RepOriginId;
use serde::{Deserialize, Serialize};
use std::{fmt, ops::Range};
use utils::const_assert;
use crate::reltag::{BlockNumber, RelTag, SlruKind};
@@ -47,6 +48,12 @@ pub const AUX_KEY_PREFIX: u8 = 0x62;
/// The key prefix of ReplOrigin keys.
pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63;
/// The key prefix of db directory keys.
pub const DB_DIR_KEY_PREFIX: u8 = 0x64;
/// The key prefix of rel direcotry keys.
pub const REL_DIR_KEY_PREFIX: u8 = 0x65;
/// Check if the key falls in the range of metadata keys.
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
@@ -108,6 +115,24 @@ impl Key {
}
}
pub fn rel_dir_sparse_key_range() -> Range<Self> {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REL_DIR_KEY_PREFIX + 1,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
/// This function checks more extensively what keys we can take on the write path.
/// If a key beginning with 00 does not have a global/default tablespace OID, it
/// will be rejected on the write path.
@@ -438,6 +463,36 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
#[inline(always)]
pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: relnode,
field5: forknum,
field6: 1,
}
}
pub fn rel_tag_sparse_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
} // it's fine to exclude the last key b/c we only use field6 == 1
}
#[inline(always)]
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
@@ -706,7 +761,7 @@ pub fn repl_origin_key_range() -> Range<Key> {
/// Non inherited range for vectored get.
pub const NON_INHERITED_RANGE: Range<Key> = AUX_FILES_KEY..AUX_FILES_KEY.next();
/// Sparse keyspace range for vectored get. Missing key error will be ignored for this range.
pub const NON_INHERITED_SPARSE_RANGE: Range<Key> = Key::metadata_key_range();
pub const SPARSE_RANGE: Range<Key> = Key::metadata_key_range();
impl Key {
// AUX_FILES currently stores only data for logical replication (slots etc), and
@@ -714,7 +769,42 @@ impl Key {
// switch (and generally it likely should be optional), so ignore these.
#[inline(always)]
pub fn is_inherited_key(self) -> bool {
!NON_INHERITED_RANGE.contains(&self) && !NON_INHERITED_SPARSE_RANGE.contains(&self)
if self.is_sparse() {
self.is_inherited_sparse_key()
} else {
!NON_INHERITED_RANGE.contains(&self)
}
}
#[inline(always)]
pub fn is_sparse(self) -> bool {
self.field1 >= METADATA_KEY_BEGIN_PREFIX && self.field1 < METADATA_KEY_END_PREFIX
}
/// Check if the key belongs to the inherited keyspace.
fn is_inherited_sparse_key(self) -> bool {
debug_assert!(self.is_sparse());
self.field1 == RELATION_SIZE_PREFIX
}
pub const fn sparse_non_inherited_keyspace() -> Range<Key> {
// The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace
const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX);
Key {
field1: AUX_KEY_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REPL_ORIGIN_KEY_PREFIX + 1,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
#[inline(always)]

View File

@@ -272,6 +272,8 @@ pub struct CompactInfoResponse {
pub compact_key_range: Option<CompactKeyRange>,
pub compact_lsn_range: Option<CompactLsnRange>,
pub sub_compaction: bool,
pub running: bool,
pub job_id: usize,
}
#[derive(Serialize, Deserialize, Clone)]

View File

@@ -44,7 +44,7 @@ pub struct ProtocolVersion(u32);
impl ProtocolVersion {
pub const fn new(major: u16, minor: u16) -> Self {
Self((major as u32) << 16 | minor as u32)
Self(((major as u32) << 16) | minor as u32)
}
pub const fn minor(self) -> u16 {
self.0 as u16

View File

@@ -38,7 +38,6 @@ pub mod http;
use opentelemetry::trace::TracerProvider;
use opentelemetry::KeyValue;
use opentelemetry_sdk::Resource;
use tracing::Subscriber;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
@@ -121,7 +120,10 @@ where
S: Subscriber + for<'span> LookupSpan<'span>,
{
// Sets up exporter from the OTEL_EXPORTER_* environment variables.
let exporter = opentelemetry_otlp::new_exporter().http();
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.build()
.expect("could not initialize opentelemetry exporter");
// TODO: opentelemetry::global::set_error_handler() with custom handler that
// bypasses default tracing layers, but logs regular looking log
@@ -132,17 +134,13 @@ where
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(
Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name,
)]),
))
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("could not initialize opentelemetry exporter")
let tracer = opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.with_resource(opentelemetry_sdk::Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name,
)]))
.build()
.tracer("global");
tracing_opentelemetry::layer().with_tracer(tracer)

View File

@@ -26,6 +26,7 @@ git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true
hyper0 = { workspace = true, features = ["full"] }
inferno.workspace = true
itertools.workspace = true
fail.workspace = true
futures = { workspace = true }

View File

@@ -112,9 +112,9 @@ impl Serialize for Generation {
// We should never be asked to serialize a None. Structures
// that include an optional generation should convert None to an
// Option<Generation>::None
Err(serde::ser::Error::custom(
"Tried to serialize invalid generation ({self})",
))
Err(serde::ser::Error::custom(format!(
"Tried to serialize invalid generation ({self:?})"
)))
}
}
}

View File

@@ -15,7 +15,7 @@ use once_cell::sync::Lazy;
use regex::Regex;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, Mutex, Notify};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{debug, info, info_span, warn, Instrument};
@@ -350,33 +350,53 @@ pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, A
};
let seconds = match parse_query_param(&req, "seconds")? {
None => 5,
Some(seconds @ 1..=30) => seconds,
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-30 secs"))),
Some(seconds @ 1..=60) => seconds,
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-60 secs"))),
};
let frequency_hz = match parse_query_param(&req, "frequency")? {
None => 99,
Some(1001..) => return Err(ApiError::BadRequest(anyhow!("frequency must be <=1000 Hz"))),
Some(frequency) => frequency,
};
// Only allow one profiler at a time.
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
let _lock = PROFILE_LOCK
.try_lock()
.map_err(|_| ApiError::Conflict("profiler already running".into()))?;
let force: bool = parse_query_param(&req, "force")?.unwrap_or_default();
// Take the profile.
let report = tokio::task::spawn_blocking(move || {
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
static PROFILE_CANCEL: Lazy<Notify> = Lazy::new(Notify::new);
let report = {
// Only allow one profiler at a time. If force is true, cancel a running profile (e.g. a
// Grafana continuous profile). We use a try_lock() loop when cancelling instead of waiting
// for a lock(), to avoid races where the notify isn't currently awaited.
let _lock = loop {
match PROFILE_LOCK.try_lock() {
Ok(lock) => break lock,
Err(_) if force => PROFILE_CANCEL.notify_waiters(),
Err(_) => {
return Err(ApiError::Conflict(
"profiler already running (use ?force=true to cancel it)".into(),
))
}
}
tokio::time::sleep(Duration::from_millis(1)).await; // don't busy-wait
};
let guard = ProfilerGuardBuilder::default()
.frequency(frequency_hz)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()?;
std::thread::sleep(Duration::from_secs(seconds));
guard.report().build()
})
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(|pprof_err| ApiError::InternalServerError(pprof_err.into()))?;
.build()
.map_err(|err| ApiError::InternalServerError(err.into()))?;
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(seconds)) => {},
_ = PROFILE_CANCEL.notified() => {},
};
guard
.report()
.build()
.map_err(|err| ApiError::InternalServerError(err.into()))?
};
// Return the report in the requested format.
match format {
@@ -417,6 +437,7 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
enum Format {
Jemalloc,
Pprof,
Svg,
}
// Parameters.
@@ -424,9 +445,24 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
None => Format::Pprof,
Some("jemalloc") => Format::Jemalloc,
Some("pprof") => Format::Pprof,
Some("svg") => Format::Svg,
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
};
// Functions and mappings to strip when symbolizing pprof profiles. If true,
// also remove child frames.
static STRIP_FUNCTIONS: Lazy<Vec<(Regex, bool)>> = Lazy::new(|| {
vec![
(Regex::new("^__rust").unwrap(), false),
(Regex::new("^_start$").unwrap(), false),
(Regex::new("^irallocx_prof").unwrap(), true),
(Regex::new("^prof_alloc_prep").unwrap(), true),
(Regex::new("^std::rt::lang_start").unwrap(), false),
(Regex::new("^std::sys::backtrace::__rust").unwrap(), false),
]
});
const STRIP_MAPPINGS: &[&str] = &["libc", "libgcc", "pthread", "vdso"];
// Obtain profiler handle.
let mut prof_ctl = jemalloc_pprof::PROF_CTL
.as_ref()
@@ -464,24 +500,9 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
// Symbolize the profile.
// TODO: consider moving this upstream to jemalloc_pprof and avoiding the
// serialization roundtrip.
static STRIP_FUNCTIONS: Lazy<Vec<(Regex, bool)>> = Lazy::new(|| {
// Functions to strip from profiles. If true, also remove child frames.
vec![
(Regex::new("^__rust").unwrap(), false),
(Regex::new("^_start$").unwrap(), false),
(Regex::new("^irallocx_prof").unwrap(), true),
(Regex::new("^prof_alloc_prep").unwrap(), true),
(Regex::new("^std::rt::lang_start").unwrap(), false),
(Regex::new("^std::sys::backtrace::__rust").unwrap(), false),
]
});
let profile = pprof::decode(&bytes)?;
let profile = pprof::symbolize(profile)?;
let profile = pprof::strip_locations(
profile,
&["libc", "libgcc", "pthread", "vdso"],
&STRIP_FUNCTIONS,
);
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
pprof::encode(&profile)
})
.await
@@ -494,6 +515,27 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
.body(Body::from(data))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
Format::Svg => {
let body = tokio::task::spawn_blocking(move || {
let bytes = prof_ctl.dump_pprof()?;
let profile = pprof::decode(&bytes)?;
let profile = pprof::symbolize(profile)?;
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
let mut opts = inferno::flamegraph::Options::default();
opts.title = "Heap inuse".to_string();
opts.count_name = "bytes".to_string();
pprof::flamegraph(profile, &mut opts)
})
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "image/svg+xml")
.body(Body::from(body))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
}
}

View File

@@ -260,7 +260,7 @@ impl FromStr for Lsn {
{
let left_num = u32::from_str_radix(left, 16).map_err(|_| LsnParseError)?;
let right_num = u32::from_str_radix(right, 16).map_err(|_| LsnParseError)?;
Ok(Lsn((left_num as u64) << 32 | right_num as u64))
Ok(Lsn(((left_num as u64) << 32) | right_num as u64))
} else {
Err(LsnParseError)
}

View File

@@ -1,8 +1,9 @@
use anyhow::bail;
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use itertools::Itertools as _;
use once_cell::sync::Lazy;
use pprof::protos::{Function, Line, Message as _, Profile};
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
use std::borrow::Cow;
@@ -188,3 +189,59 @@ pub fn strip_locations(
profile
}
/// Generates an SVG flamegraph from a symbolized pprof profile.
pub fn flamegraph(
profile: Profile,
opts: &mut inferno::flamegraph::Options,
) -> anyhow::Result<Vec<u8>> {
if profile.mapping.iter().any(|m| !m.has_functions) {
bail!("profile not symbolized");
}
// Index locations, functions, and strings.
let locations: HashMap<u64, Location> =
profile.location.into_iter().map(|l| (l.id, l)).collect();
let functions: HashMap<u64, Function> =
profile.function.into_iter().map(|f| (f.id, f)).collect();
let strings = profile.string_table;
// Resolve stacks as function names, and sum sample values per stack. Also reverse the stack,
// since inferno expects it bottom-up.
let mut stacks: HashMap<Vec<&str>, i64> = HashMap::new();
for sample in profile.sample {
let mut stack = Vec::with_capacity(sample.location_id.len());
for location in sample.location_id.into_iter().rev() {
let Some(location) = locations.get(&location) else {
bail!("missing location {location}");
};
for line in location.line.iter().rev() {
let Some(function) = functions.get(&line.function_id) else {
bail!("missing function {}", line.function_id);
};
let Some(name) = strings.get(function.name as usize) else {
bail!("missing string {}", function.name);
};
stack.push(name.as_str());
}
}
let Some(&value) = sample.value.first() else {
bail!("missing value");
};
*stacks.entry(stack).or_default() += value;
}
// Construct stack lines for inferno.
let lines = stacks
.into_iter()
.map(|(stack, value)| (stack.into_iter().join(";"), value))
.map(|(stack, value)| format!("{stack} {value}"))
.sorted()
.collect_vec();
// Construct the flamegraph.
let mut bytes = Vec::new();
let lines = lines.iter().map(|line| line.as_str());
inferno::flamegraph::from_lines(opts, lines, &mut bytes)?;
Ok(bytes)
}

View File

@@ -96,7 +96,11 @@ impl<T: Send> Sender<T> {
}
}
State::SenderWaitsForReceiverToConsume(_data) => {
// Really, we shouldn't be polled until receiver has consumed and wakes us.
// SAFETY: send is single threaded due to `&mut self` requirement,
// therefore register is not concurrent.
unsafe {
self.state.wake_sender.register(cx.waker());
}
Poll::Pending
}
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
@@ -449,4 +453,38 @@ mod tests {
let err = recv_task.await.unwrap().expect_err("should error");
assert!(matches!(err, RecvError::SenderGone));
}
#[tokio::test(start_paused = true)]
async fn test_receiver_drop_while_waiting_for_receiver_to_consume_unblocks_sender() {
let (mut sender, receiver) = channel();
let state = receiver.state.clone();
sender.send((), |_, _| unreachable!()).await.unwrap();
assert!(matches!(&*state.value.lock().unwrap(), &State::HasData(_)));
let unmergeable = sender.send((), |_, _| Err(()));
let mut unmergeable = std::pin::pin!(unmergeable);
tokio::select! {
_ = tokio::time::sleep(FOREVER) => {},
_ = &mut unmergeable => {
panic!("unmergeable should not complete");
},
}
assert!(matches!(
&*state.value.lock().unwrap(),
&State::SenderWaitsForReceiverToConsume(_)
));
drop(receiver);
assert!(matches!(
&*state.value.lock().unwrap(),
&State::ReceiverGone
));
unmergeable.await.unwrap_err();
}
}

View File

@@ -53,10 +53,12 @@ project_build_tag!(BUILD_TAG);
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
/// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
/// Configure jemalloc to profile heap allocations by sampling stack traces every 2 MB (1 << 21).
/// This adds roughly 3% overhead for allocations on average, which is acceptable considering
/// performance-sensitive code will avoid allocations as far as possible anyway.
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
const PID_FILE_NAME: &str = "pageserver.pid";

View File

@@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::DEFAULT_PG_VERSION;
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
TimelineGcRequest, TimelineInfo,
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
@@ -2052,15 +2052,7 @@ async fn timeline_compact_info_handler(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
let mut resp = Vec::new();
for item in res {
resp.push(CompactInfoResponse {
compact_key_range: item.compact_key_range,
compact_lsn_range: item.compact_lsn_range,
sub_compaction: item.sub_compaction,
});
}
let resp = tenant.get_scheduled_compaction_tasks(timeline_id);
json_response(StatusCode::OK, resp)
}
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))

View File

@@ -91,15 +91,6 @@ pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_layers_visited_per_read_global",
"Number of layers visited to reconstruct one key",
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
)
.expect("failed to define a metric")
});
pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_layers_visited_per_vectored_read_global",
@@ -3894,7 +3885,6 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
// histograms
[
&READ_NUM_LAYERS_VISITED,
&VEC_READ_NUM_LAYERS_VISITED,
&WAIT_LSN_TIME,
&WAL_REDO_TIME,

View File

@@ -618,6 +618,9 @@ impl BatchedFeMessage {
};
let throttled = tokio::select! {
throttled = shard.pagestream_throttle.throttle(tokens) => { throttled }
_ = shard.cancel.cancelled() => {
return Err(QueryError::Shutdown);
}
_ = cancel.cancelled() => {
return Err(QueryError::Shutdown);
}

View File

@@ -22,13 +22,14 @@ use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range,
slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key,
twophase_file_key, twophase_key_range, CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY,
CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::key::{rel_tag_sparse_key, Key};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
@@ -55,6 +56,8 @@ pub const MAX_AUX_FILE_DELTAS: usize = 1024;
/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
pub const REL_STORE_V2: bool = true;
#[derive(Debug)]
pub enum LsnForTimestamp {
/// Found commits both before and after the given timestamp
@@ -483,12 +486,24 @@ impl Timeline {
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
return Ok(false);
}
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
if REL_STORE_V2 {
// fetch directory listing
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
let buf = version.get(self, key, ctx).await;
if let Ok(buf) = buf {
Ok(!buf.is_empty())
} else {
Ok(false)
} // TODO: sparse keyspace needs a different get function
} else {
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
}
}
/// Get a list of all existing relations in given tablespace and database.
@@ -506,20 +521,44 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
if REL_STORE_V2 {
// scan directory listing
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let results = self
.scan(KeySpace::single(key_range), version.get_lsn(), ctx)
.await?;
let mut rels = HashSet::new();
for (key, val) in results {
if val?.is_empty() {
continue;
}
assert_eq!(key.field6, 1);
assert_eq!(key.field2, spcnode);
assert_eq!(key.field3, dbnode);
rels.insert(RelTag {
spcnode,
dbnode,
relnode: key.field4,
forknum: key.field5,
});
}
Ok(rels)
} else {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
let rels: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
relnode: *relnode,
forknum: *forknum,
}));
let dir = RelDirectory::des(&buf)?;
let rels: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
relnode: *relnode,
forknum: *forknum,
}));
Ok(rels)
Ok(rels)
}
}
/// Get the whole SLRU segment
@@ -1042,7 +1081,7 @@ impl Timeline {
if has_relmap_file {
result.add_key(relmap_file_key(spcnode, dbnode));
}
result.add_key(rel_dir_to_key(spcnode, dbnode));
// result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
@@ -1116,7 +1155,11 @@ impl Timeline {
let dense_keyspace = result.to_keyspace();
let sparse_keyspace = SparseKeySpace(KeySpace {
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
ranges: vec![
Key::metadata_aux_key_range(),
repl_origin_key_range(),
Key::rel_dir_sparse_key_range(),
],
});
if cfg!(debug_assertions) {
@@ -1608,7 +1651,7 @@ impl DatadirModification<'_> {
let buf = DbDirectory::ser(&dbdir)?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
}
if r.is_none() {
if !REL_STORE_V2 && r.is_none() {
// Create RelDirectory
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
@@ -1730,8 +1773,8 @@ impl DatadirModification<'_> {
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir =
if REL_STORE_V2 {
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
@@ -1739,30 +1782,51 @@ impl DatadirModification<'_> {
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
}
let rel_dir_key = rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists
if let Ok(buf) = self.get(rel_dir_key, ctx).await {
if !buf.is_empty() {
return Err(RelationError::AlreadyExists);
}
}
self.put(rel_dir_key, Value::Image(Bytes::from_static(b"1")));
// TODO: update directory_entries_count, it seems to be a metrics
} else {
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
}
self.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
}
self.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
// Put size
let size_key = rel_size_to_key(rel);
let buf = nblocks.to_le_bytes();
@@ -1841,33 +1905,57 @@ impl DatadirModification<'_> {
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
if REL_STORE_V2 {
for ((spc_node, db_node), rel_tags) in drop_relations {
for rel_tag in rel_tags {
let key =
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
if self.get(key, ctx).await.is_ok() {
// remove the relation key
self.put(key, Value::Image(Bytes::from_static(b""))); // put tombstone
let mut dirty = false;
for rel_tag in rel_tags {
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
dirty = true;
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel_tag));
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel_tag));
}
}
}
} else {
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
if dirty {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
let mut dirty = false;
for rel_tag in rel_tags {
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
dirty = true;
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel_tag));
}
}
if dirty {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
}
}
}

View File

@@ -21,6 +21,7 @@ use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::CompactInfoResponse;
use pageserver_api::models::LsnLease;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::models::TimelineState;
@@ -37,21 +38,17 @@ use remote_timeline_client::manifest::{
};
use remote_timeline_client::UploadQueueNotReadyError;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::compaction::GcCompactJob;
use timeline::compaction::ScheduledCompactionTask;
use timeline::compaction::GcCompactionQueue;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::offload::OffloadError;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
@@ -347,10 +344,8 @@ pub struct Tenant {
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
/// Scheduled compaction tasks. Currently, this can only be populated by triggering
/// a manual gc-compaction from the manual compaction API.
scheduled_compaction_tasks:
std::sync::Mutex<HashMap<TimelineId, VecDeque<ScheduledCompactionTask>>>,
/// Scheduled gc-compaction tasks.
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
/// If the tenant is in Activating state, notify this to encourage it
/// to proceed to Active as soon as possible, rather than waiting for lazy
@@ -2997,104 +2992,18 @@ impl Tenant {
if has_pending_l0_compaction_task {
Some(true)
} else {
let mut has_pending_scheduled_compaction_task;
let next_scheduled_compaction_task = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
if !tline_pending_tasks.is_empty() {
info!(
"{} tasks left in the compaction schedule queue",
tline_pending_tasks.len()
);
}
let next_task = tline_pending_tasks.pop_front();
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
next_task
} else {
has_pending_scheduled_compaction_task = false;
None
}
let queue = {
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard.get(timeline_id).cloned()
};
if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task
{
if !next_scheduled_compaction_task
.options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else if next_scheduled_compaction_task.options.sub_compaction {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs: Vec<GcCompactJob> = timeline
.gc_compaction_split_jobs(
GcCompactJob::from_compact_options(
next_scheduled_compaction_task.options.clone(),
),
next_scheduled_compaction_task
.options
.sub_compaction_max_job_size_mb,
)
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
info!("no jobs to run, skipping scheduled compaction task");
} else {
has_pending_scheduled_compaction_task = true;
let jobs_len = jobs.len();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
// until we do further refactors to allow directly call `compact_with_gc`.
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
if job.dry_run {
flags |= CompactFlags::DryRun;
}
let options = CompactOptions {
flags,
sub_compaction: false,
compact_key_range: Some(job.compact_key_range.into()),
compact_lsn_range: Some(job.compact_lsn_range.into()),
sub_compaction_max_job_size_mb: None,
};
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
ScheduledCompactionTask {
options,
// The last job in the queue sends the signal and releases the gc guard
result_tx: next_scheduled_compaction_task
.result_tx
.take(),
gc_block: next_scheduled_compaction_task
.gc_block
.take(),
}
} else {
ScheduledCompactionTask {
options,
result_tx: None,
gc_block: None,
}
});
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
}
} else {
let _ = timeline
.compact_with_options(
cancel,
next_scheduled_compaction_task.options,
ctx,
)
.instrument(info_span!("scheduled_compact_timeline", %timeline_id))
.await?;
if let Some(tx) = next_scheduled_compaction_task.result_tx.take() {
// TODO: we can send compaction statistics in the future
tx.send(()).ok();
}
}
if let Some(queue) = queue {
let has_pending_tasks = queue
.iteration(cancel, ctx, &self.gc_block, timeline)
.await?;
Some(has_pending_tasks)
} else {
Some(false)
}
Some(has_pending_scheduled_compaction_task)
}
} else {
None
@@ -3124,34 +3033,32 @@ impl Tenant {
}
/// Cancel scheduled compaction tasks
pub(crate) fn cancel_scheduled_compaction(
&self,
timeline_id: TimelineId,
) -> Vec<ScheduledCompactionTask> {
pub(crate) fn cancel_scheduled_compaction(&self, timeline_id: TimelineId) {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) {
let current_tline_pending_tasks = std::mem::take(tline_pending_tasks);
current_tline_pending_tasks.into_iter().collect()
} else {
Vec::new()
if let Some(q) = guard.get_mut(&timeline_id) {
q.cancel_scheduled();
}
}
pub(crate) fn get_scheduled_compaction_tasks(
&self,
timeline_id: TimelineId,
) -> Vec<CompactOptions> {
use itertools::Itertools;
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard
.get(&timeline_id)
.map(|tline_pending_tasks| {
tline_pending_tasks
.iter()
.map(|x| x.options.clone())
.collect_vec()
})
.unwrap_or_default()
) -> Vec<CompactInfoResponse> {
let res = {
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard.get(&timeline_id).map(|q| q.remaining_jobs())
};
let Some((running, remaining)) = res else {
return Vec::new();
};
let mut result = Vec::new();
if let Some((id, running)) = running {
result.extend(running.into_compact_info_resp(id, true));
}
for (id, job) in remaining {
result.extend(job.into_compact_info_resp(id, false));
}
result
}
/// Schedule a compaction task for a timeline.
@@ -3160,20 +3067,12 @@ impl Tenant {
timeline_id: TimelineId,
options: CompactOptions,
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
let gc_guard = match self.gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
bail!("cannot run gc-compaction because gc is blocked: {}", e);
}
};
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask {
options,
result_tx: Some(tx),
gc_block: Some(gc_guard),
});
let q = guard
.entry(timeline_id)
.or_insert_with(|| Arc::new(GcCompactionQueue::new()));
q.schedule_manual_compaction(options, Some(tx));
Ok(rx)
}
@@ -5783,7 +5682,7 @@ mod tests {
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
use itertools::Itertools;
use pageserver_api::key::{Key, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::key::{Key, AUX_KEY_PREFIX, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
use pageserver_api::value::Value;
@@ -7842,7 +7741,18 @@ mod tests {
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
let base_key_child = Key::from_hex("620000000033333333444444445500000001").unwrap();
let base_key_nonexist = Key::from_hex("620000000033333333444444445500000002").unwrap();
let base_key_overwrite = Key::from_hex("620000000033333333444444445500000003").unwrap();
let base_inherited_key = Key::from_hex("610000000033333333444444445500000000").unwrap();
let base_inherited_key_child =
Key::from_hex("610000000033333333444444445500000001").unwrap();
let base_inherited_key_nonexist =
Key::from_hex("610000000033333333444444445500000002").unwrap();
let base_inherited_key_overwrite =
Key::from_hex("610000000033333333444444445500000003").unwrap();
assert_eq!(base_key.field1, AUX_KEY_PREFIX); // in case someone accidentally changed the prefix...
assert_eq!(base_inherited_key.field1, RELATION_SIZE_PREFIX);
let tline = tenant
.create_test_timeline_with_layers(
@@ -7851,7 +7761,18 @@ mod tests {
DEFAULT_PG_VERSION,
&ctx,
Vec::new(), // delta layers
vec![(Lsn(0x20), vec![(base_key, test_img("metadata key 1"))])], // image layers
vec![(
Lsn(0x20),
vec![
(base_inherited_key, test_img("metadata inherited key 1")),
(
base_inherited_key_overwrite,
test_img("metadata key overwrite 1a"),
),
(base_key, test_img("metadata key 1")),
(base_key_overwrite, test_img("metadata key overwrite 1b")),
],
)], // image layers
Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN
)
.await?;
@@ -7865,7 +7786,18 @@ mod tests {
Vec::new(), // delta layers
vec![(
Lsn(0x30),
vec![(base_key_child, test_img("metadata key 2"))],
vec![
(
base_inherited_key_child,
test_img("metadata inherited key 2"),
),
(
base_inherited_key_overwrite,
test_img("metadata key overwrite 2a"),
),
(base_key_child, test_img("metadata key 2")),
(base_key_overwrite, test_img("metadata key overwrite 2b")),
],
)], // image layers
Lsn(0x30),
)
@@ -7887,6 +7819,26 @@ mod tests {
get_vectored_impl_wrapper(&tline, base_key_nonexist, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_key_overwrite, lsn, &ctx).await?,
Some(test_img("metadata key overwrite 1b"))
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_inherited_key, lsn, &ctx).await?,
Some(test_img("metadata inherited key 1"))
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_inherited_key_child, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_inherited_key_nonexist, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&tline, base_inherited_key_overwrite, lsn, &ctx).await?,
Some(test_img("metadata key overwrite 1a"))
);
// test vectored get on child timeline
assert_eq!(
@@ -7901,6 +7853,82 @@ mod tests {
get_vectored_impl_wrapper(&child, base_key_nonexist, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_inherited_key, lsn, &ctx).await?,
Some(test_img("metadata inherited key 1"))
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_inherited_key_child, lsn, &ctx).await?,
Some(test_img("metadata inherited key 2"))
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_inherited_key_nonexist, lsn, &ctx).await?,
None
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_key_overwrite, lsn, &ctx).await?,
Some(test_img("metadata key overwrite 2b"))
);
assert_eq!(
get_vectored_impl_wrapper(&child, base_inherited_key_overwrite, lsn, &ctx).await?,
Some(test_img("metadata key overwrite 2a"))
);
// test vectored scan on parent timeline
let mut reconstruct_state = ValuesReconstructState::new();
let res = tline
.get_vectored_impl(
KeySpace::single(Key::metadata_key_range()),
lsn,
&mut reconstruct_state,
&ctx,
)
.await?;
assert_eq!(
res.into_iter()
.map(|(k, v)| (k, v.unwrap()))
.collect::<Vec<_>>(),
vec![
(base_inherited_key, test_img("metadata inherited key 1")),
(
base_inherited_key_overwrite,
test_img("metadata key overwrite 1a")
),
(base_key, test_img("metadata key 1")),
(base_key_overwrite, test_img("metadata key overwrite 1b")),
]
);
// test vectored scan on child timeline
let mut reconstruct_state = ValuesReconstructState::new();
let res = child
.get_vectored_impl(
KeySpace::single(Key::metadata_key_range()),
lsn,
&mut reconstruct_state,
&ctx,
)
.await?;
assert_eq!(
res.into_iter()
.map(|(k, v)| (k, v.unwrap()))
.collect::<Vec<_>>(),
vec![
(base_inherited_key, test_img("metadata inherited key 1")),
(
base_inherited_key_child,
test_img("metadata inherited key 2")
),
(
base_inherited_key_overwrite,
test_img("metadata key overwrite 2a")
),
(base_key_child, test_img("metadata key 2")),
(base_key_overwrite, test_img("metadata key overwrite 2b")),
]
);
Ok(())
}

View File

@@ -11,7 +11,7 @@
pub(crate) use pageserver_api::config::TenantConfigToml as TenantConf;
use pageserver_api::models::CompactionAlgorithmSettings;
use pageserver_api::models::EvictionPolicy;
use pageserver_api::models::{self, TenantConfigPatch, ThrottleConfig};
use pageserver_api::models::{self, TenantConfigPatch};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::de::IntoDeserializer;
use serde::{Deserialize, Serialize};
@@ -597,7 +597,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
.map(humantime),
heatmap_period: value.heatmap_period.map(humantime),
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
timeline_get_throttle: value.timeline_get_throttle,
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
lsn_lease_length: value.lsn_lease_length.map(humantime),
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),

View File

@@ -84,17 +84,17 @@ impl Value {
fn to_u64(self) -> u64 {
let b = &self.0;
(b[0] as u64) << 32
| (b[1] as u64) << 24
| (b[2] as u64) << 16
| (b[3] as u64) << 8
((b[0] as u64) << 32)
| ((b[1] as u64) << 24)
| ((b[2] as u64) << 16)
| ((b[3] as u64) << 8)
| b[4] as u64
}
fn to_blknum(self) -> u32 {
let b = &self.0;
assert!(b[0] == 0x80);
(b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
((b[1] as u32) << 24) | ((b[2] as u32) << 16) | ((b[3] as u32) << 8) | b[4] as u32
}
}

View File

@@ -12,7 +12,7 @@ pub mod merge_iterator;
use crate::context::{AccessStatsBehavior, RequestContext};
use bytes::Bytes;
use pageserver_api::key::{Key, NON_INHERITED_SPARSE_RANGE};
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
@@ -209,7 +209,7 @@ impl ValuesReconstructState {
.keys
.entry(*key)
.or_insert(Ok(VectoredValueReconstructState::default()));
let is_sparse_key = NON_INHERITED_SPARSE_RANGE.contains(key);
let is_sparse_key = key.is_sparse();
if let Ok(state) = state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => {

View File

@@ -112,8 +112,8 @@ const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
///
/// Layout:
/// - 1 bit: `will_init`
/// - [`MAX_SUPPORTED_BLOB_LEN_BITS`]: `len`
/// - [`MAX_SUPPORTED_POS_BITS`]: `pos`
/// - [`MAX_SUPPORTED_BLOB_LEN_BITS`][]: `len`
/// - [`MAX_SUPPORTED_POS_BITS`](IndexEntry::MAX_SUPPORTED_POS_BITS): `pos`
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct IndexEntry(u64);

View File

@@ -27,7 +27,7 @@ use pageserver_api::{
config::tenant_conf_defaults::DEFAULT_COMPACTION_THRESHOLD,
key::{
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
NON_INHERITED_SPARSE_RANGE,
SPARSE_RANGE,
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
@@ -3221,7 +3221,7 @@ impl Timeline {
// We don't return a blanket [`GetVectoredError::MissingKey`] to avoid
// stalling compaction.
keyspace.remove_overlapping_with(&KeySpace {
ranges: vec![NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE],
ranges: vec![NON_INHERITED_RANGE, Key::sparse_non_inherited_keyspace()],
});
// Keyspace is fully retrieved
@@ -3242,7 +3242,13 @@ impl Timeline {
// keys from `keyspace`, we expect there to be no overlap between it and the image covered key
// space. If that's not the case, we had at least one key encounter a gap in the image layer
// and stop the search as a result of that.
let removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
// Do not fire missing key error and end early for sparse keys. Note that we hava already removed
// non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of
// figuring out what is the inherited key range and do a fine-grained pruning.
removed.remove_overlapping_with(&KeySpace {
ranges: vec![SPARSE_RANGE],
});
if !removed.is_empty() {
break Some(removed);
}
@@ -3257,6 +3263,21 @@ impl Timeline {
timeline = &*timeline_owned;
};
// Remove sparse keys from the keyspace so that it doesn't fire errors.
let missing_keyspace = if let Some(missing_keyspace) = missing_keyspace {
let mut missing_keyspace = missing_keyspace;
missing_keyspace.remove_overlapping_with(&KeySpace {
ranges: vec![SPARSE_RANGE],
});
if missing_keyspace.is_empty() {
None
} else {
Some(missing_keyspace)
}
} else {
None
};
if let Some(missing_keyspace) = missing_keyspace {
return Err(GetVectoredError::MissingKey(MissingKeyError {
key: missing_keyspace.start().unwrap(), /* better if we can store the full keyspace */
@@ -3762,36 +3783,30 @@ impl Timeline {
return Err(FlushLayerError::Cancelled);
}
let mut layers_to_upload = Vec::new();
layers_to_upload.extend(
self.create_image_layers(
&rel_partition,
self.initdb_lsn,
ImageLayerCreationMode::Initial,
ctx,
)
.await?,
);
let mut partitions = KeyPartitioning::default();
partitions.parts.extend(rel_partition.parts);
if !metadata_partition.parts.is_empty() {
assert_eq!(
metadata_partition.parts.len(),
1,
"currently sparse keyspace should only contain a single metadata keyspace"
);
layers_to_upload.extend(
self.create_image_layers(
// Safety: create_image_layers treat sparse keyspaces differently that it does not scan
// every single key within the keyspace, and therefore, it's safe to force converting it
// into a dense keyspace before calling this function.
&metadata_partition.into_dense(),
self.initdb_lsn,
ImageLayerCreationMode::Initial,
ctx,
)
.await?,
);
partitions
.parts
.extend(metadata_partition.into_dense().parts);
}
let mut layers_to_upload = Vec::new();
layers_to_upload.extend(
self.create_image_layers(
&partitions,
self.initdb_lsn,
ImageLayerCreationMode::Initial,
ctx,
)
.await?,
);
(layers_to_upload, None)
} else {
// Normal case, write out a L0 delta layer file.

View File

@@ -4,7 +4,7 @@
//!
//! The old legacy algorithm is implemented directly in `timeline.rs`.
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
@@ -16,10 +16,12 @@ use super::{
use anyhow::{anyhow, bail, Context};
use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::key::KEY_SIZE;
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::models::CompactInfoResponse;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use serde::Serialize;
use tokio_util::sync::CancellationToken;
@@ -30,6 +32,7 @@ use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -63,16 +66,284 @@ use super::CompactionError;
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
const COMPACTION_DELTA_THRESHOLD: usize = 5;
/// A scheduled compaction task.
pub(crate) struct ScheduledCompactionTask {
/// It's unfortunate that we need to store a compact options struct here because the only outer
/// API we can call here is `compact_with_options` which does a few setup calls before starting the
/// actual compaction job... We should refactor this to store `GcCompactionJob` in the future.
pub options: CompactOptions,
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
pub gc_block: Option<gc_block::Guard>,
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct GcCompactionJobId(pub usize);
impl std::fmt::Display for GcCompactionJobId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone)]
pub enum GcCompactionQueueItem {
Manual(CompactOptions),
SubCompactionJob(CompactOptions),
#[allow(dead_code)]
UpdateL2Lsn(Lsn),
Notify(GcCompactionJobId),
}
impl GcCompactionQueueItem {
pub fn into_compact_info_resp(
self,
id: GcCompactionJobId,
running: bool,
) -> Option<CompactInfoResponse> {
match self {
GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse {
compact_key_range: options.compact_key_range,
compact_lsn_range: options.compact_lsn_range,
sub_compaction: options.sub_compaction,
running,
job_id: id.0,
}),
GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
compact_key_range: options.compact_key_range,
compact_lsn_range: options.compact_lsn_range,
sub_compaction: options.sub_compaction,
running,
job_id: id.0,
}),
GcCompactionQueueItem::UpdateL2Lsn(_) => None,
GcCompactionQueueItem::Notify(_) => None,
}
}
}
struct GcCompactionQueueInner {
running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
notify: HashMap<GcCompactionJobId, tokio::sync::oneshot::Sender<()>>,
gc_guards: HashMap<GcCompactionJobId, gc_block::Guard>,
last_id: GcCompactionJobId,
}
impl GcCompactionQueueInner {
fn next_id(&mut self) -> GcCompactionJobId {
let id = self.last_id;
self.last_id = GcCompactionJobId(id.0 + 1);
id
}
}
/// A structure to store gc_compaction jobs.
pub struct GcCompactionQueue {
/// All items in the queue, and the currently-running job.
inner: std::sync::Mutex<GcCompactionQueueInner>,
/// Ensure only one thread is consuming the queue.
consumer_lock: tokio::sync::Mutex<()>,
}
impl GcCompactionQueue {
pub fn new() -> Self {
GcCompactionQueue {
inner: std::sync::Mutex::new(GcCompactionQueueInner {
running: None,
queued: VecDeque::new(),
notify: HashMap::new(),
gc_guards: HashMap::new(),
last_id: GcCompactionJobId(0),
}),
consumer_lock: tokio::sync::Mutex::new(()),
}
}
pub fn cancel_scheduled(&self) {
let mut guard = self.inner.lock().unwrap();
guard.queued.clear();
guard.notify.clear();
guard.gc_guards.clear();
}
/// Schedule a manual compaction job.
pub fn schedule_manual_compaction(
&self,
options: CompactOptions,
notify: Option<tokio::sync::oneshot::Sender<()>>,
) -> GcCompactionJobId {
let mut guard = self.inner.lock().unwrap();
let id = guard.next_id();
guard
.queued
.push_back((id, GcCompactionQueueItem::Manual(options)));
if let Some(notify) = notify {
guard.notify.insert(id, notify);
}
info!("scheduled compaction job id={}", id);
id
}
/// Trigger an auto compaction.
#[allow(dead_code)]
pub fn trigger_auto_compaction(&self, _: &Arc<Timeline>) {}
/// Notify the caller the job has finished and unblock GC.
fn notify_and_unblock(&self, id: GcCompactionJobId) {
info!("compaction job id={} finished", id);
let mut guard = self.inner.lock().unwrap();
if let Some(blocking) = guard.gc_guards.remove(&id) {
drop(blocking)
}
if let Some(tx) = guard.notify.remove(&id) {
let _ = tx.send(());
}
}
async fn handle_sub_compaction(
&self,
id: GcCompactionJobId,
options: CompactOptions,
timeline: &Arc<Timeline>,
gc_block: &GcBlock,
) -> Result<(), CompactionError> {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs: Vec<GcCompactJob> = timeline
.gc_compaction_split_jobs(
GcCompactJob::from_compact_options(options.clone()),
options.sub_compaction_max_job_size_mb,
)
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
info!("no jobs to run, skipping scheduled compaction task");
self.notify_and_unblock(id);
} else {
let gc_guard = match gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
return Err(CompactionError::Other(anyhow!(
"cannot run gc-compaction because gc is blocked: {}",
e
)));
}
};
let jobs_len = jobs.len();
let mut pending_tasks = Vec::new();
for job in jobs {
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
// until we do further refactors to allow directly call `compact_with_gc`.
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
if job.dry_run {
flags |= CompactFlags::DryRun;
}
let options = CompactOptions {
flags,
sub_compaction: false,
compact_key_range: Some(job.compact_key_range.into()),
compact_lsn_range: Some(job.compact_lsn_range.into()),
sub_compaction_max_job_size_mb: None,
};
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
}
pending_tasks.push(GcCompactionQueueItem::Notify(id));
{
let mut guard = self.inner.lock().unwrap();
guard.gc_guards.insert(id, gc_guard);
let mut tasks = Vec::new();
for task in pending_tasks {
let id = guard.next_id();
tasks.push((id, task));
}
tasks.reverse();
for item in tasks {
guard.queued.push_front(item);
}
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
}
Ok(())
}
/// Take a job from the queue and process it. Returns if there are still pending tasks.
pub async fn iteration(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
gc_block: &GcBlock,
timeline: &Arc<Timeline>,
) -> Result<bool, CompactionError> {
let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
let has_pending_tasks;
let (id, item) = {
let mut guard = self.inner.lock().unwrap();
let Some((id, item)) = guard.queued.pop_front() else {
return Ok(false);
};
guard.running = Some((id, item.clone()));
has_pending_tasks = !guard.queued.is_empty();
(id, item)
};
match item {
GcCompactionQueueItem::Manual(options) => {
if !options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options);
} else if options.sub_compaction {
self.handle_sub_compaction(id, options, timeline, gc_block)
.await?;
} else {
let gc_guard = match gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
return Err(CompactionError::Other(anyhow!(
"cannot run gc-compaction because gc is blocked: {}",
e
)));
}
};
{
let mut guard = self.inner.lock().unwrap();
guard.gc_guards.insert(id, gc_guard);
}
let _ = timeline
.compact_with_options(cancel, options, ctx)
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
.await?;
self.notify_and_unblock(id);
}
}
GcCompactionQueueItem::SubCompactionJob(options) => {
let _ = timeline
.compact_with_options(cancel, options, ctx)
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
.await?;
}
GcCompactionQueueItem::Notify(id) => {
self.notify_and_unblock(id);
}
GcCompactionQueueItem::UpdateL2Lsn(_) => {
unreachable!()
}
}
{
let mut guard = self.inner.lock().unwrap();
guard.running = None;
}
Ok(has_pending_tasks)
}
#[allow(clippy::type_complexity)]
pub fn remaining_jobs(
&self,
) -> (
Option<(GcCompactionJobId, GcCompactionQueueItem)>,
VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
) {
let guard = self.inner.lock().unwrap();
(guard.running.clone(), guard.queued.clone())
}
#[allow(dead_code)]
pub fn remaining_jobs_num(&self) -> usize {
let guard = self.inner.lock().unwrap();
guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
}
}
/// A job description for the gc-compaction job. This structure describes the rectangle range that the job will

View File

@@ -403,7 +403,7 @@ pub(super) async fn handle_walreceiver_connection(
// need to advance last record LSN on all shards. If we've not ingested the latest
// record, then set the LSN of the modification past it. This way all shards
// advance their last record LSN at the same time.
let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) {
let needs_last_record_lsn_advance = match next_record_lsn {
Some(lsn) if lsn > modification.get_lsn() => {
modification.set_lsn(lsn).unwrap();
true

View File

@@ -308,7 +308,7 @@ impl WalIngest {
epoch -= 1;
}
Ok((epoch as u64) << 32 | xid as u64)
Ok(((epoch as u64) << 32) | xid as u64)
}
async fn ingest_clear_vm_bits(

View File

@@ -1,26 +0,0 @@
EXTENSION = hnsw
EXTVERSION = 0.1.0
MODULE_big = hnsw
DATA = $(wildcard *--*.sql)
OBJS = hnsw.o hnswalg.o
TESTS = $(wildcard test/sql/*.sql)
REGRESS = $(patsubst test/sql/%.sql,%,$(TESTS))
REGRESS_OPTS = --inputdir=test --load-extension=hnsw
# For auto-vectorization:
# - GCC (needs -ftree-vectorize OR -O3) - https://gcc.gnu.org/projects/tree-ssa/vectorization.html
PG_CFLAGS += -O3
PG_CXXFLAGS += -O3 -std=c++11
PG_LDFLAGS += -lstdc++
all: $(EXTENSION)--$(EXTVERSION).sql
PG_CONFIG ?= pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
dist:
mkdir -p dist
git archive --format zip --prefix=$(EXTENSION)-$(EXTVERSION)/ --output dist/$(EXTENSION)-$(EXTVERSION).zip master

View File

@@ -1,25 +0,0 @@
# Revisiting the Inverted Indices for Billion-Scale Approximate Nearest Neighbors
This ANN extension of Postgres is based
on [ivf-hnsw](https://github.com/dbaranchuk/ivf-hnsw.git) implementation of [HNSW](https://www.pinecone.io/learn/hnsw),
the code for the current state-of-the-art billion-scale nearest neighbor search system presented in the paper:
[Revisiting the Inverted Indices for Billion-Scale Approximate Nearest Neighbors](http://openaccess.thecvf.com/content_ECCV_2018/html/Dmitry_Baranchuk_Revisiting_the_Inverted_ECCV_2018_paper.html),
<br>
Dmitry Baranchuk, Artem Babenko, Yury Malkov
# Postgres extension
HNSW index is hold in memory (built on demand) and it's maxial size is limited
by `maxelements` index parameter. Another required parameter is nubmer of dimensions (if it is not specified in column type).
Optional parameter `ef` specifies number of neighbors which are considered during index construction and search (corresponds `efConstruction` and `efSearch` parameters
described in the article).
# Example of usage:
```
create extension hnsw;
create table embeddings(id integer primary key, payload real[]);
create index on embeddings using hnsw(payload) with (maxelements=1000000, dims=100, m=32);
select id from embeddings order by payload <-> array[1.0, 2.0,...] limit 100;
```

View File

@@ -1,29 +0,0 @@
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION hnsw" to load this file. \quit
-- functions
CREATE FUNCTION l2_distance(real[], real[]) RETURNS real
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
-- operators
CREATE OPERATOR <-> (
LEFTARG = real[], RIGHTARG = real[], PROCEDURE = l2_distance,
COMMUTATOR = '<->'
);
-- access method
CREATE FUNCTION hnsw_handler(internal) RETURNS index_am_handler
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE ACCESS METHOD hnsw TYPE INDEX HANDLER hnsw_handler;
COMMENT ON ACCESS METHOD hnsw IS 'hnsw index access method';
-- opclasses
CREATE OPERATOR CLASS knn_ops
DEFAULT FOR TYPE real[] USING hnsw AS
OPERATOR 1 <-> (real[], real[]) FOR ORDER BY float_ops;

View File

@@ -1,590 +0,0 @@
#include "postgres.h"
#include "access/amapi.h"
#include "access/generic_xlog.h"
#include "access/relation.h"
#include "access/reloptions.h"
#include "access/tableam.h"
#include "catalog/index.h"
#include "commands/vacuum.h"
#include "nodes/execnodes.h"
#include "storage/bufmgr.h"
#include "utils/guc.h"
#include "utils/selfuncs.h"
#include <math.h>
#include <float.h>
#include "hnsw.h"
PG_MODULE_MAGIC;
typedef struct {
int32 vl_len_; /* varlena header (do not touch directly!) */
int dims;
int maxelements;
int efConstruction;
int efSearch;
int M;
} HnswOptions;
static relopt_kind hnsw_relopt_kind;
typedef struct {
HierarchicalNSW* hnsw;
size_t curr;
size_t n_results;
ItemPointer results;
} HnswScanOpaqueData;
typedef HnswScanOpaqueData* HnswScanOpaque;
typedef struct {
Oid relid;
uint32 status;
HierarchicalNSW* hnsw;
} HnswHashEntry;
#define SH_PREFIX hnsw_index
#define SH_ELEMENT_TYPE HnswHashEntry
#define SH_KEY_TYPE Oid
#define SH_KEY relid
#define SH_STORE_HASH
#define SH_GET_HASH(tb, a) ((a)->relid)
#define SH_HASH_KEY(tb, key) (key)
#define SH_EQUAL(tb, a, b) ((a) == (b))
#define SH_SCOPE static inline
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
#define INDEX_HASH_SIZE 11
#define DEFAULT_EF_SEARCH 64
PGDLLEXPORT void _PG_init(void);
static hnsw_index_hash *hnsw_indexes;
/*
* Initialize index options and variables
*/
void
_PG_init(void)
{
hnsw_relopt_kind = add_reloption_kind();
add_int_reloption(hnsw_relopt_kind, "dims", "Number of dimensions",
0, 0, INT_MAX, AccessExclusiveLock);
add_int_reloption(hnsw_relopt_kind, "maxelements", "Maximal number of elements",
0, 0, INT_MAX, AccessExclusiveLock);
add_int_reloption(hnsw_relopt_kind, "m", "Number of neighbors of each vertex",
100, 0, INT_MAX, AccessExclusiveLock);
add_int_reloption(hnsw_relopt_kind, "efconstruction", "Number of inspected neighbors during index construction",
16, 1, INT_MAX, AccessExclusiveLock);
add_int_reloption(hnsw_relopt_kind, "efsearch", "Number of inspected neighbors during index search",
64, 1, INT_MAX, AccessExclusiveLock);
hnsw_indexes = hnsw_index_create(TopMemoryContext, INDEX_HASH_SIZE, NULL);
}
static void
hnsw_build_callback(Relation index, ItemPointer tid, Datum *values,
bool *isnull, bool tupleIsAlive, void *state)
{
HierarchicalNSW* hnsw = (HierarchicalNSW*) state;
ArrayType* array;
int n_items;
label_t label = 0;
/* Skip nulls */
if (isnull[0])
return;
array = DatumGetArrayTypeP(values[0]);
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
if (n_items != hnsw_dimensions(hnsw))
{
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
n_items, hnsw_dimensions(hnsw));
}
memcpy(&label, tid, sizeof(*tid));
hnsw_add_point(hnsw, (coord_t*)ARR_DATA_PTR(array), label);
}
static void
hnsw_populate(HierarchicalNSW* hnsw, Relation indexRel, Relation heapRel)
{
IndexInfo* indexInfo = BuildIndexInfo(indexRel);
Assert(indexInfo->ii_NumIndexAttrs == 1);
table_index_build_scan(heapRel, indexRel, indexInfo,
true, true, hnsw_build_callback, (void *) hnsw, NULL);
}
#ifdef __APPLE__
#include <sys/types.h>
#include <sys/sysctl.h>
static void
hnsw_check_available_memory(Size requested)
{
size_t total;
if (sysctlbyname("hw.memsize", NULL, &total, NULL, 0) < 0)
elog(ERROR, "Failed to get amount of RAM: %m");
if ((Size)NBuffers*BLCKSZ + requested >= total)
elog(ERROR, "HNSW index requeries %ld bytes while only %ld are available",
requested, total - (Size)NBuffers*BLCKSZ);
}
#else
#include <sys/sysinfo.h>
static void
hnsw_check_available_memory(Size requested)
{
struct sysinfo si;
Size total;
if (sysinfo(&si) < 0)
elog(ERROR, "Failed to get amount of RAM: %m");
total = si.totalram*si.mem_unit;
if ((Size)NBuffers*BLCKSZ + requested >= total)
elog(ERROR, "HNSW index requeries %ld bytes while only %ld are available",
requested, total - (Size)NBuffers*BLCKSZ);
}
#endif
static HierarchicalNSW*
hnsw_get_index(Relation indexRel, Relation heapRel)
{
HierarchicalNSW* hnsw;
Oid indexoid = RelationGetRelid(indexRel);
HnswHashEntry* entry = hnsw_index_lookup(hnsw_indexes, indexoid);
if (entry == NULL)
{
size_t dims, maxelements;
size_t M;
size_t maxM;
size_t size_links_level0;
size_t size_data_per_element;
size_t data_size;
dsm_handle handle = indexoid << 1; /* make it even */
void* impl_private = NULL;
void* mapped_address = NULL;
Size mapped_size = 0;
Size shmem_size;
bool exists = true;
bool found;
HnswOptions *opts = (HnswOptions *) indexRel->rd_options;
if (opts == NULL || opts->maxelements == 0 || opts->dims == 0) {
elog(ERROR, "HNSW index requires 'maxelements' and 'dims' to be specified");
}
dims = opts->dims;
maxelements = opts->maxelements;
M = opts->M;
maxM = M * 2;
data_size = dims * sizeof(coord_t);
size_links_level0 = (maxM + 1) * sizeof(idx_t);
size_data_per_element = size_links_level0 + data_size + sizeof(label_t);
shmem_size = hnsw_sizeof() + maxelements * size_data_per_element;
hnsw_check_available_memory(shmem_size);
/* first try to attach to existed index */
if (!dsm_impl_op(DSM_OP_ATTACH, handle, 0, &impl_private,
&mapped_address, &mapped_size, DEBUG1))
{
/* index doesn't exists: try to create it */
if (!dsm_impl_op(DSM_OP_CREATE, handle, shmem_size, &impl_private,
&mapped_address, &mapped_size, DEBUG1))
{
/* We can do it under shared lock, so some other backend may
* try to initialize index. If create is failed because index already
* created by somebody else, then try to attach to it once again
*/
if (!dsm_impl_op(DSM_OP_ATTACH, handle, 0, &impl_private,
&mapped_address, &mapped_size, ERROR))
{
return NULL;
}
}
else
{
exists = false;
}
}
Assert(mapped_size == shmem_size);
hnsw = (HierarchicalNSW*)mapped_address;
if (!exists)
{
hnsw_init(hnsw, dims, maxelements, M, maxM, opts->efConstruction);
hnsw_populate(hnsw, indexRel, heapRel);
}
entry = hnsw_index_insert(hnsw_indexes, indexoid, &found);
Assert(!found);
entry->hnsw = hnsw;
}
else
{
hnsw = entry->hnsw;
}
return hnsw;
}
/*
* Start or restart an index scan
*/
static IndexScanDesc
hnsw_beginscan(Relation index, int nkeys, int norderbys)
{
IndexScanDesc scan = RelationGetIndexScan(index, nkeys, norderbys);
HnswScanOpaque so = (HnswScanOpaque) palloc(sizeof(HnswScanOpaqueData));
Relation heap = relation_open(index->rd_index->indrelid, NoLock);
so->hnsw = hnsw_get_index(index, heap);
relation_close(heap, NoLock);
so->curr = 0;
so->n_results = 0;
so->results = NULL;
scan->opaque = so;
return scan;
}
/*
* Start or restart an index scan
*/
static void
hnsw_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
{
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
if (so->results)
{
pfree(so->results);
so->results = NULL;
}
so->curr = 0;
if (orderbys && scan->numberOfOrderBys > 0)
memmove(scan->orderByData, orderbys, scan->numberOfOrderBys * sizeof(ScanKeyData));
}
/*
* Fetch the next tuple in the given scan
*/
static bool
hnsw_gettuple(IndexScanDesc scan, ScanDirection dir)
{
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
/*
* Index can be used to scan backward, but Postgres doesn't support
* backward scan on operators
*/
Assert(ScanDirectionIsForward(dir));
if (so->curr == 0)
{
Datum value;
ArrayType* array;
int n_items;
size_t n_results;
label_t* results;
HnswOptions *opts = (HnswOptions *) scan->indexRelation->rd_options;
size_t efSearch = opts ? opts->efSearch : DEFAULT_EF_SEARCH;
/* Safety check */
if (scan->orderByData == NULL)
elog(ERROR, "cannot scan HNSW index without order");
/* No items will match if null */
if (scan->orderByData->sk_flags & SK_ISNULL)
return false;
value = scan->orderByData->sk_argument;
array = DatumGetArrayTypeP(value);
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
if (n_items != hnsw_dimensions(so->hnsw))
{
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
n_items, hnsw_dimensions(so->hnsw));
}
if (!hnsw_search(so->hnsw, (coord_t*)ARR_DATA_PTR(array), efSearch, &n_results, &results))
elog(ERROR, "HNSW index search failed");
so->results = (ItemPointer)palloc(n_results*sizeof(ItemPointerData));
so->n_results = n_results;
for (size_t i = 0; i < n_results; i++)
{
memcpy(&so->results[i], &results[i], sizeof(so->results[i]));
}
free(results);
}
if (so->curr >= so->n_results)
{
return false;
}
else
{
scan->xs_heaptid = so->results[so->curr++];
scan->xs_recheckorderby = false;
return true;
}
}
/*
* End a scan and release resources
*/
static void
hnsw_endscan(IndexScanDesc scan)
{
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
if (so->results)
pfree(so->results);
pfree(so);
scan->opaque = NULL;
}
/*
* Estimate the cost of an index scan
*/
static void
hnsw_costestimate(PlannerInfo *root, IndexPath *path, double loop_count,
Cost *indexStartupCost, Cost *indexTotalCost,
Selectivity *indexSelectivity, double *indexCorrelation
,double *indexPages
)
{
GenericCosts costs;
/* Never use index without order */
if (path->indexorderbys == NULL)
{
*indexStartupCost = DBL_MAX;
*indexTotalCost = DBL_MAX;
*indexSelectivity = 0;
*indexCorrelation = 0;
*indexPages = 0;
return;
}
MemSet(&costs, 0, sizeof(costs));
genericcostestimate(root, path, loop_count, &costs);
/* Startup cost and total cost are same */
*indexStartupCost = costs.indexTotalCost;
*indexTotalCost = costs.indexTotalCost;
*indexSelectivity = costs.indexSelectivity;
*indexCorrelation = costs.indexCorrelation;
*indexPages = costs.numIndexPages;
}
/*
* Parse and validate the reloptions
*/
static bytea *
hnsw_options(Datum reloptions, bool validate)
{
static const relopt_parse_elt tab[] = {
{"dims", RELOPT_TYPE_INT, offsetof(HnswOptions, dims)},
{"maxelements", RELOPT_TYPE_INT, offsetof(HnswOptions, maxelements)},
{"efconstruction", RELOPT_TYPE_INT, offsetof(HnswOptions, efConstruction)},
{"efsearch", RELOPT_TYPE_INT, offsetof(HnswOptions, efSearch)},
{"m", RELOPT_TYPE_INT, offsetof(HnswOptions, M)}
};
return (bytea *) build_reloptions(reloptions, validate,
hnsw_relopt_kind,
sizeof(HnswOptions),
tab, lengthof(tab));
}
/*
* Validate catalog entries for the specified operator class
*/
static bool
hnsw_validate(Oid opclassoid)
{
return true;
}
/*
* Build the index for a logged table
*/
static IndexBuildResult *
hnsw_build(Relation heap, Relation index, IndexInfo *indexInfo)
{
HierarchicalNSW* hnsw = hnsw_get_index(index, heap);
IndexBuildResult* result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
result->heap_tuples = result->index_tuples = hnsw_count(hnsw);
return result;
}
/*
* Insert a tuple into the index
*/
static bool
hnsw_insert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid,
Relation heap, IndexUniqueCheck checkUnique,
bool indexUnchanged,
IndexInfo *indexInfo)
{
HierarchicalNSW* hnsw = hnsw_get_index(index, heap);
Datum value;
ArrayType* array;
int n_items;
label_t label = 0;
/* Skip nulls */
if (isnull[0])
return false;
/* Detoast value */
value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
array = DatumGetArrayTypeP(value);
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
if (n_items != hnsw_dimensions(hnsw))
{
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
n_items, hnsw_dimensions(hnsw));
}
memcpy(&label, heap_tid, sizeof(*heap_tid));
if (!hnsw_add_point(hnsw, (coord_t*)ARR_DATA_PTR(array), label))
elog(ERROR, "HNSW index insert failed");
return true;
}
/*
* Build the index for an unlogged table
*/
static void
hnsw_buildempty(Relation index)
{
/* index will be constructed on dema nd when accessed */
}
/*
* Clean up after a VACUUM operation
*/
static IndexBulkDeleteResult *
hnsw_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
Relation rel = info->index;
if (stats == NULL)
return NULL;
stats->num_pages = RelationGetNumberOfBlocks(rel);
return stats;
}
/*
* Bulk delete tuples from the index
*/
static IndexBulkDeleteResult *
hnsw_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
if (stats == NULL)
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
return stats;
}
/*
* Define index handler
*
* See https://www.postgresql.org/docs/current/index-api.html
*/
PGDLLEXPORT PG_FUNCTION_INFO_V1(hnsw_handler);
Datum
hnsw_handler(PG_FUNCTION_ARGS)
{
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
amroutine->amstrategies = 0;
amroutine->amsupport = 0;
amroutine->amoptsprocnum = 0;
amroutine->amcanorder = false;
amroutine->amcanorderbyop = true;
amroutine->amcanbackward = false; /* can change direction mid-scan */
amroutine->amcanunique = false;
amroutine->amcanmulticol = false;
amroutine->amoptionalkey = true;
amroutine->amsearcharray = false;
amroutine->amsearchnulls = false;
amroutine->amstorage = false;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
amroutine->amcaninclude = false;
amroutine->amusemaintenanceworkmem = false; /* not used during VACUUM */
amroutine->amparallelvacuumoptions = VACUUM_OPTION_PARALLEL_BULKDEL;
amroutine->amkeytype = InvalidOid;
/* Interface functions */
amroutine->ambuild = hnsw_build;
amroutine->ambuildempty = hnsw_buildempty;
amroutine->aminsert = hnsw_insert;
amroutine->ambulkdelete = hnsw_bulkdelete;
amroutine->amvacuumcleanup = hnsw_vacuumcleanup;
amroutine->amcanreturn = NULL; /* tuple not included in heapsort */
amroutine->amcostestimate = hnsw_costestimate;
amroutine->amoptions = hnsw_options;
amroutine->amproperty = NULL; /* TODO AMPROP_DISTANCE_ORDERABLE */
amroutine->ambuildphasename = NULL;
amroutine->amvalidate = hnsw_validate;
amroutine->amadjustmembers = NULL;
amroutine->ambeginscan = hnsw_beginscan;
amroutine->amrescan = hnsw_rescan;
amroutine->amgettuple = hnsw_gettuple;
amroutine->amgetbitmap = NULL;
amroutine->amendscan = hnsw_endscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
/* Interface functions to support parallel index scans */
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
/*
* Get the L2 distance between vectors
*/
PGDLLEXPORT PG_FUNCTION_INFO_V1(l2_distance);
Datum
l2_distance(PG_FUNCTION_ARGS)
{
ArrayType *a = PG_GETARG_ARRAYTYPE_P(0);
ArrayType *b = PG_GETARG_ARRAYTYPE_P(1);
int a_dim = ArrayGetNItems(ARR_NDIM(a), ARR_DIMS(a));
int b_dim = ArrayGetNItems(ARR_NDIM(b), ARR_DIMS(b));
dist_t distance = 0.0;
dist_t diff;
coord_t *ax = (coord_t*)ARR_DATA_PTR(a);
coord_t *bx = (coord_t*)ARR_DATA_PTR(b);
if (a_dim != b_dim)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("different array dimensions %d and %d", a_dim, b_dim)));
}
for (int i = 0; i < a_dim; i++)
{
diff = ax[i] - bx[i];
distance += diff * diff;
}
PG_RETURN_FLOAT4((dist_t)sqrt(distance));
}

View File

@@ -1,4 +0,0 @@
comment = '** Deprecated ** Please use pg_embedding instead'
default_version = '0.1.0'
module_pathname = '$libdir/hnsw'
relocatable = true

View File

@@ -1,15 +0,0 @@
#pragma once
typedef float coord_t;
typedef float dist_t;
typedef uint32_t idx_t;
typedef uint64_t label_t;
typedef struct HierarchicalNSW HierarchicalNSW;
bool hnsw_search(HierarchicalNSW* hnsw, const coord_t *point, size_t efSearch, size_t* n_results, label_t** results);
bool hnsw_add_point(HierarchicalNSW* hnsw, const coord_t *point, label_t label);
void hnsw_init(HierarchicalNSW* hnsw, size_t dim, size_t maxelements, size_t M, size_t maxM, size_t efConstruction);
int hnsw_dimensions(HierarchicalNSW* hnsw);
size_t hnsw_count(HierarchicalNSW* hnsw);
size_t hnsw_sizeof(void);

View File

@@ -1,379 +0,0 @@
#include "hnswalg.h"
#if defined(__GNUC__)
#define PORTABLE_ALIGN32 __attribute__((aligned(32)))
#define PREFETCH(addr,hint) __builtin_prefetch(addr, 0, hint)
#else
#define PORTABLE_ALIGN32 __declspec(align(32))
#define PREFETCH(addr,hint)
#endif
HierarchicalNSW::HierarchicalNSW(size_t dim_, size_t maxelements_, size_t M_, size_t maxM_, size_t efConstruction_)
{
dim = dim_;
data_size = dim * sizeof(coord_t);
efConstruction = efConstruction_;
maxelements = maxelements_;
M = M_;
maxM = maxM_;
size_links_level0 = (maxM + 1) * sizeof(idx_t);
size_data_per_element = size_links_level0 + data_size + sizeof(label_t);
offset_data = size_links_level0;
offset_label = offset_data + data_size;
enterpoint_node = 0;
cur_element_count = 0;
#ifdef __x86_64__
use_avx2 = __builtin_cpu_supports("avx2");
#endif
}
std::priority_queue<std::pair<dist_t, idx_t>> HierarchicalNSW::searchBaseLayer(const coord_t *point, size_t ef)
{
std::vector<uint32_t> visited;
visited.resize((cur_element_count + 31) >> 5);
std::priority_queue<std::pair<dist_t, idx_t >> topResults;
std::priority_queue<std::pair<dist_t, idx_t >> candidateSet;
dist_t dist = fstdistfunc(point, getDataByInternalId(enterpoint_node));
topResults.emplace(dist, enterpoint_node);
candidateSet.emplace(-dist, enterpoint_node);
visited[enterpoint_node >> 5] = 1 << (enterpoint_node & 31);
dist_t lowerBound = dist;
while (!candidateSet.empty())
{
std::pair<dist_t, idx_t> curr_el_pair = candidateSet.top();
if (-curr_el_pair.first > lowerBound)
break;
candidateSet.pop();
idx_t curNodeNum = curr_el_pair.second;
idx_t* data = get_linklist0(curNodeNum);
size_t size = *data++;
PREFETCH(getDataByInternalId(*data), 0);
for (size_t j = 0; j < size; ++j) {
size_t tnum = *(data + j);
PREFETCH(getDataByInternalId(*(data + j + 1)), 0);
if (!(visited[tnum >> 5] & (1 << (tnum & 31)))) {
visited[tnum >> 5] |= 1 << (tnum & 31);
dist = fstdistfunc(point, getDataByInternalId(tnum));
if (topResults.top().first > dist || topResults.size() < ef) {
candidateSet.emplace(-dist, tnum);
PREFETCH(get_linklist0(candidateSet.top().second), 0);
topResults.emplace(dist, tnum);
if (topResults.size() > ef)
topResults.pop();
lowerBound = topResults.top().first;
}
}
}
}
return topResults;
}
void HierarchicalNSW::getNeighborsByHeuristic(std::priority_queue<std::pair<dist_t, idx_t>> &topResults, size_t NN)
{
if (topResults.size() < NN)
return;
std::priority_queue<std::pair<dist_t, idx_t>> resultSet;
std::vector<std::pair<dist_t, idx_t>> returnlist;
while (topResults.size() > 0) {
resultSet.emplace(-topResults.top().first, topResults.top().second);
topResults.pop();
}
while (resultSet.size()) {
if (returnlist.size() >= NN)
break;
std::pair<dist_t, idx_t> curen = resultSet.top();
dist_t dist_to_query = -curen.first;
resultSet.pop();
bool good = true;
for (std::pair<dist_t, idx_t> curen2 : returnlist) {
dist_t curdist = fstdistfunc(getDataByInternalId(curen2.second),
getDataByInternalId(curen.second));
if (curdist < dist_to_query) {
good = false;
break;
}
}
if (good) returnlist.push_back(curen);
}
for (std::pair<dist_t, idx_t> elem : returnlist)
topResults.emplace(-elem.first, elem.second);
}
void HierarchicalNSW::mutuallyConnectNewElement(const coord_t *point, idx_t cur_c,
std::priority_queue<std::pair<dist_t, idx_t>> topResults)
{
getNeighborsByHeuristic(topResults, M);
std::vector<idx_t> res;
res.reserve(M);
while (topResults.size() > 0) {
res.push_back(topResults.top().second);
topResults.pop();
}
{
idx_t* data = get_linklist0(cur_c);
if (*data)
throw std::runtime_error("Should be blank");
*data++ = res.size();
for (size_t idx = 0; idx < res.size(); idx++) {
if (data[idx])
throw std::runtime_error("Should be blank");
data[idx] = res[idx];
}
}
for (size_t idx = 0; idx < res.size(); idx++) {
if (res[idx] == cur_c)
throw std::runtime_error("Connection to the same element");
size_t resMmax = maxM;
idx_t *ll_other = get_linklist0(res[idx]);
idx_t sz_link_list_other = *ll_other;
if (sz_link_list_other > resMmax || sz_link_list_other < 0)
throw std::runtime_error("Bad sz_link_list_other");
if (sz_link_list_other < resMmax) {
idx_t *data = ll_other + 1;
data[sz_link_list_other] = cur_c;
*ll_other = sz_link_list_other + 1;
} else {
// finding the "weakest" element to replace it with the new one
idx_t *data = ll_other + 1;
dist_t d_max = fstdistfunc(getDataByInternalId(cur_c), getDataByInternalId(res[idx]));
// Heuristic:
std::priority_queue<std::pair<dist_t, idx_t>> candidates;
candidates.emplace(d_max, cur_c);
for (size_t j = 0; j < sz_link_list_other; j++)
candidates.emplace(fstdistfunc(getDataByInternalId(data[j]), getDataByInternalId(res[idx])), data[j]);
getNeighborsByHeuristic(candidates, resMmax);
size_t indx = 0;
while (!candidates.empty()) {
data[indx] = candidates.top().second;
candidates.pop();
indx++;
}
*ll_other = indx;
}
}
}
void HierarchicalNSW::addPoint(const coord_t *point, label_t label)
{
if (cur_element_count >= maxelements) {
throw std::runtime_error("The number of elements exceeds the specified limit");
}
idx_t cur_c = cur_element_count++;
memset((char *) get_linklist0(cur_c), 0, size_data_per_element);
memcpy(getDataByInternalId(cur_c), point, data_size);
memcpy(getExternalLabel(cur_c), &label, sizeof label);
// Do nothing for the first element
if (cur_c != 0) {
std::priority_queue <std::pair<dist_t, idx_t>> topResults = searchBaseLayer(point, efConstruction);
mutuallyConnectNewElement(point, cur_c, topResults);
}
};
std::priority_queue<std::pair<dist_t, label_t>> HierarchicalNSW::searchKnn(const coord_t *query, size_t k)
{
std::priority_queue<std::pair<dist_t, label_t>> topResults;
auto topCandidates = searchBaseLayer(query, k);
while (topCandidates.size() > k) {
topCandidates.pop();
}
while (!topCandidates.empty()) {
std::pair<dist_t, idx_t> rez = topCandidates.top();
label_t label;
memcpy(&label, getExternalLabel(rez.second), sizeof(label));
topResults.push(std::pair<dist_t, label_t>(rez.first, label));
topCandidates.pop();
}
return topResults;
};
dist_t fstdistfunc_scalar(const coord_t *x, const coord_t *y, size_t n)
{
dist_t distance = 0.0;
for (size_t i = 0; i < n; i++)
{
dist_t diff = x[i] - y[i];
distance += diff * diff;
}
return distance;
}
#ifdef __x86_64__
#include <immintrin.h>
__attribute__((target("avx2")))
dist_t fstdistfunc_avx2(const coord_t *x, const coord_t *y, size_t n)
{
const size_t TmpResSz = sizeof(__m256) / sizeof(float);
float PORTABLE_ALIGN32 TmpRes[TmpResSz];
size_t qty16 = n / 16;
const float *pEnd1 = x + (qty16 * 16);
__m256 diff, v1, v2;
__m256 sum = _mm256_set1_ps(0);
while (x < pEnd1) {
v1 = _mm256_loadu_ps(x);
x += 8;
v2 = _mm256_loadu_ps(y);
y += 8;
diff = _mm256_sub_ps(v1, v2);
sum = _mm256_add_ps(sum, _mm256_mul_ps(diff, diff));
v1 = _mm256_loadu_ps(x);
x += 8;
v2 = _mm256_loadu_ps(y);
y += 8;
diff = _mm256_sub_ps(v1, v2);
sum = _mm256_add_ps(sum, _mm256_mul_ps(diff, diff));
}
_mm256_store_ps(TmpRes, sum);
float res = TmpRes[0] + TmpRes[1] + TmpRes[2] + TmpRes[3] + TmpRes[4] + TmpRes[5] + TmpRes[6] + TmpRes[7];
return (res);
}
dist_t fstdistfunc_sse(const coord_t *x, const coord_t *y, size_t n)
{
const size_t TmpResSz = sizeof(__m128) / sizeof(float);
float PORTABLE_ALIGN32 TmpRes[TmpResSz];
size_t qty16 = n / 16;
const float *pEnd1 = x + (qty16 * 16);
__m128 diff, v1, v2;
__m128 sum = _mm_set1_ps(0);
while (x < pEnd1) {
v1 = _mm_loadu_ps(x);
x += 4;
v2 = _mm_loadu_ps(y);
y += 4;
diff = _mm_sub_ps(v1, v2);
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
v1 = _mm_loadu_ps(x);
x += 4;
v2 = _mm_loadu_ps(y);
y += 4;
diff = _mm_sub_ps(v1, v2);
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
v1 = _mm_loadu_ps(x);
x += 4;
v2 = _mm_loadu_ps(y);
y += 4;
diff = _mm_sub_ps(v1, v2);
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
v1 = _mm_loadu_ps(x);
x += 4;
v2 = _mm_loadu_ps(y);
y += 4;
diff = _mm_sub_ps(v1, v2);
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
}
_mm_store_ps(TmpRes, sum);
float res = TmpRes[0] + TmpRes[1] + TmpRes[2] + TmpRes[3];
return res;
}
#endif
dist_t HierarchicalNSW::fstdistfunc(const coord_t *x, const coord_t *y)
{
#ifndef __x86_64__
return fstdistfunc_scalar(x, y, dim);
#else
if(use_avx2)
return fstdistfunc_avx2(x, y, dim);
return fstdistfunc_sse(x, y, dim);
#endif
}
bool hnsw_search(HierarchicalNSW* hnsw, const coord_t *point, size_t efSearch, size_t* n_results, label_t** results)
{
try
{
auto result = hnsw->searchKnn(point, efSearch);
size_t nResults = result.size();
*results = (label_t*)malloc(nResults*sizeof(label_t));
for (size_t i = nResults; i-- != 0;)
{
(*results)[i] = result.top().second;
result.pop();
}
*n_results = nResults;
return true;
}
catch (std::exception& x)
{
return false;
}
}
bool hnsw_add_point(HierarchicalNSW* hnsw, const coord_t *point, label_t label)
{
try
{
hnsw->addPoint(point, label);
return true;
}
catch (std::exception& x)
{
fprintf(stderr, "Catch %s\n", x.what());
return false;
}
}
void hnsw_init(HierarchicalNSW* hnsw, size_t dims, size_t maxelements, size_t M, size_t maxM, size_t efConstruction)
{
new ((void*)hnsw) HierarchicalNSW(dims, maxelements, M, maxM, efConstruction);
}
int hnsw_dimensions(HierarchicalNSW* hnsw)
{
return (int)hnsw->dim;
}
size_t hnsw_count(HierarchicalNSW* hnsw)
{
return hnsw->cur_element_count;
}
size_t hnsw_sizeof(void)
{
return sizeof(HierarchicalNSW);
}

View File

@@ -1,69 +0,0 @@
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unordered_map>
#include <unordered_set>
#include <map>
#include <cmath>
#include <queue>
#include <stdexcept>
extern "C" {
#include "hnsw.h"
}
struct HierarchicalNSW
{
size_t maxelements;
size_t cur_element_count;
idx_t enterpoint_node;
size_t dim;
size_t data_size;
size_t offset_data;
size_t offset_label;
size_t size_data_per_element;
size_t M;
size_t maxM;
size_t size_links_level0;
size_t efConstruction;
#ifdef __x86_64__
bool use_avx2;
#endif
char data_level0_memory[0]; // varying size
public:
HierarchicalNSW(size_t dim, size_t maxelements, size_t M, size_t maxM, size_t efConstruction);
~HierarchicalNSW();
inline coord_t *getDataByInternalId(idx_t internal_id) const {
return (coord_t *)&data_level0_memory[internal_id * size_data_per_element + offset_data];
}
inline idx_t *get_linklist0(idx_t internal_id) const {
return (idx_t*)&data_level0_memory[internal_id * size_data_per_element];
}
inline label_t *getExternalLabel(idx_t internal_id) const {
return (label_t *)&data_level0_memory[internal_id * size_data_per_element + offset_label];
}
std::priority_queue<std::pair<dist_t, idx_t>> searchBaseLayer(const coord_t *x, size_t ef);
void getNeighborsByHeuristic(std::priority_queue<std::pair<dist_t, idx_t>> &topResults, size_t NN);
void mutuallyConnectNewElement(const coord_t *x, idx_t id, std::priority_queue<std::pair<dist_t, idx_t>> topResults);
void addPoint(const coord_t *point, label_t label);
std::priority_queue<std::pair<dist_t, label_t>> searchKnn(const coord_t *query_data, size_t k);
dist_t fstdistfunc(const coord_t *x, const coord_t *y);
};

View File

@@ -1,28 +0,0 @@
SET enable_seqscan = off;
CREATE TABLE t (val real[]);
INSERT INTO t (val) VALUES ('{0,0,0}'), ('{1,2,3}'), ('{1,1,1}'), (NULL);
CREATE INDEX ON t USING hnsw (val) WITH (maxelements = 10, dims=3, m=3);
INSERT INTO t (val) VALUES (array[1,2,4]);
explain SELECT * FROM t ORDER BY val <-> array[3,3,3];
QUERY PLAN
--------------------------------------------------------------------
Index Scan using t_val_idx on t (cost=4.02..8.06 rows=3 width=36)
Order By: (val <-> '{3,3,3}'::real[])
(2 rows)
SELECT * FROM t ORDER BY val <-> array[3,3,3];
val
---------
{1,2,3}
{1,2,4}
{1,1,1}
{0,0,0}
(4 rows)
SELECT COUNT(*) FROM t;
count
-------
5
(1 row)
DROP TABLE t;

View File

@@ -1,13 +0,0 @@
SET enable_seqscan = off;
CREATE TABLE t (val real[]);
INSERT INTO t (val) VALUES ('{0,0,0}'), ('{1,2,3}'), ('{1,1,1}'), (NULL);
CREATE INDEX ON t USING hnsw (val) WITH (maxelements = 10, dims=3, m=3);
INSERT INTO t (val) VALUES (array[1,2,4]);
explain SELECT * FROM t ORDER BY val <-> array[3,3,3];
SELECT * FROM t ORDER BY val <-> array[3,3,3];
SELECT COUNT(*) FROM t;
DROP TABLE t;

8
poetry.lock generated
View File

@@ -2028,13 +2028,13 @@ openapi-schema-validator = ">=0.4.2,<0.5.0"
[[package]]
name = "packaging"
version = "23.0"
version = "24.2"
description = "Core utilities for Python packages"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.8"
files = [
{file = "packaging-23.0-py3-none-any.whl", hash = "sha256:714ac14496c3e68c99c29b00845f7a2b85f3bb6f1078fd9f72fd20f0570002b2"},
{file = "packaging-23.0.tar.gz", hash = "sha256:b6ad297f8907de0fa2fe1ccbd26fdaf387f5f47c7275fedf8cce89f99446cf97"},
{file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"},
{file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"},
]
[[package]]

View File

@@ -106,6 +106,7 @@ jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
signature = "2"
ecdsa = "0.16"
p256 = { version = "0.13", features = ["jwk"] }
ed25519-dalek = { version = "2", default-features = false, features = ["rand_core"] }
rsa = "0.9"
workspace_hack.workspace = true

View File

@@ -1,7 +1,8 @@
use std::fmt;
use async_trait::async_trait;
use postgres_client::config::SslMode;
use pq_proto::BeMessage as Be;
use std::fmt;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span};
@@ -12,10 +13,13 @@ use crate::auth::IpPattern;
use crate::cache::Cached;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::{self, client::cplane_proxy_v1, CachedNodeInfo, NodeInfo};
use crate::control_plane::client::cplane_proxy_v1;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::{ReportableError, UserFacingError};
use crate::proxy::connect_compute::ComputeConnectBackend;
use crate::proxy::NeonOptions;
use crate::stream::PqStream;
use crate::types::RoleName;
use crate::{auth, compute, waiters};
#[derive(Debug, Error)]
@@ -105,10 +109,16 @@ impl ConsoleRedirectBackend {
ctx: &RequestContext,
auth_config: &'static AuthenticationConfig,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<(ConsoleRedirectNodeInfo, Option<Vec<IpPattern>>)> {
) -> auth::Result<(
ConsoleRedirectNodeInfo,
ComputeUserInfo,
Option<Vec<IpPattern>>,
)> {
authenticate(ctx, auth_config, &self.console_uri, client)
.await
.map(|(node_info, ip_allowlist)| (ConsoleRedirectNodeInfo(node_info), ip_allowlist))
.map(|(node_info, user_info, ip_allowlist)| {
(ConsoleRedirectNodeInfo(node_info), user_info, ip_allowlist)
})
}
}
@@ -133,7 +143,7 @@ async fn authenticate(
auth_config: &'static AuthenticationConfig,
link_uri: &reqwest::Url,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<(NodeInfo, Option<Vec<IpPattern>>)> {
) -> auth::Result<(NodeInfo, ComputeUserInfo, Option<Vec<IpPattern>>)> {
ctx.set_auth_method(crate::context::AuthMethod::ConsoleRedirect);
// registering waiter can fail if we get unlucky with rng.
@@ -188,8 +198,15 @@ async fn authenticate(
let mut config = compute::ConnCfg::new(db_info.host.to_string(), db_info.port);
config.dbname(&db_info.dbname).user(&db_info.user);
let user: RoleName = db_info.user.into();
let user_info = ComputeUserInfo {
endpoint: db_info.aux.endpoint_id.as_str().into(),
user: user.clone(),
options: NeonOptions::default(),
};
ctx.set_dbname(db_info.dbname.into());
ctx.set_user(db_info.user.into());
ctx.set_user(user);
ctx.set_project(db_info.aux.clone());
info!("woken up a compute node");
@@ -212,6 +229,7 @@ async fn authenticate(
config,
aux: db_info.aux,
},
user_info,
db_info.allowed_ips,
))
}

View File

@@ -24,10 +24,8 @@ use crate::control_plane::messages::MetricsAuxInfo;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumDbConnectionsGuard};
use crate::proxy::neon_option;
use crate::proxy::NeonOptions;
use crate::tls::postgres_rustls::MakeRustlsConnect;
use crate::types::Host;
use crate::types::{EndpointId, RoleName};
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
@@ -253,6 +251,7 @@ impl ConnCfg {
ctx: &RequestContext,
aux: MetricsAuxInfo,
config: &ComputeConfig,
user_info: ComputeUserInfo,
) -> Result<PostgresConnection, ConnectionError> {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (socket_addr, stream, host) = self.connect_raw(config.timeout).await?;
@@ -287,28 +286,6 @@ impl ConnCfg {
self.0.get_ssl_mode()
);
let compute_info = match parameters.get("user") {
Some(user) => {
match parameters.get("database") {
Some(database) => {
ComputeUserInfo {
user: RoleName::from(user),
options: NeonOptions::default(), // just a shim, we don't need options
endpoint: EndpointId::from(database),
}
}
None => {
warn!("compute node didn't return database name");
ComputeUserInfo::default()
}
}
}
None => {
warn!("compute node didn't return user name");
ComputeUserInfo::default()
}
};
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(
@@ -321,7 +298,7 @@ impl ConnCfg {
},
vec![], // TODO: deprecated, will be removed
host.to_string(),
compute_info,
user_info,
);
let connection = PostgresConnection {

View File

@@ -195,7 +195,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
ctx.set_db_options(params.clone());
let (user_info, ip_allowlist) = match backend
let (node_info, user_info, ip_allowlist) = match backend
.authenticate(ctx, &config.authentication_config, &mut stream)
.await
{
@@ -208,11 +208,12 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let mut node = connect_to_compute(
ctx,
&TcpMechanism {
user_info,
params_compat: true,
params: &params,
locks: &config.connect_compute_locks,
},
&user_info,
&node_info,
config.wake_compute_retry_config,
&config.connect_to_compute,
)

View File

@@ -74,8 +74,11 @@ impl NodeInfo {
&self,
ctx: &RequestContext,
config: &ComputeConfig,
user_info: ComputeUserInfo,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config.connect(ctx, self.aux.clone(), config).await
self.config
.connect(ctx, self.aux.clone(), config, user_info)
.await
}
pub(crate) fn reuse_settings(&mut self, other: Self) {

View File

@@ -4,7 +4,7 @@ use tokio::time;
use tracing::{debug, info, warn};
use super::retry::ShouldRetryWakeCompute;
use crate::auth::backend::ComputeCredentialKeys;
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
use crate::config::{ComputeConfig, RetryConfig};
use crate::context::RequestContext;
@@ -71,6 +71,8 @@ pub(crate) struct TcpMechanism<'a> {
/// connect_to_compute concurrency lock
pub(crate) locks: &'static ApiLocks<Host>,
pub(crate) user_info: ComputeUserInfo,
}
#[async_trait]
@@ -88,7 +90,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
permit.release_result(node_info.connect(ctx, config).await)
permit.release_result(node_info.connect(ctx, config, self.user_info.clone()).await)
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {

View File

@@ -332,16 +332,19 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
}
};
let params_compat = match &user_info {
auth::Backend::ControlPlane(_, info) => {
info.info.options.get(NeonOptions::PARAMS_COMPAT).is_some()
}
auth::Backend::Local(_) => false,
let compute_user_info = match &user_info {
auth::Backend::ControlPlane(_, info) => &info.info,
auth::Backend::Local(_) => unreachable!("local proxy does not run tcp proxy service"),
};
let params_compat = compute_user_info
.options
.get(NeonOptions::PARAMS_COMPAT)
.is_some();
let mut node = connect_to_compute(
ctx,
&TcpMechanism {
user_info: compute_user_info.clone(),
params_compat,
params: &params,
locks: &config.connect_compute_locks,

View File

@@ -74,7 +74,11 @@ pub(crate) enum Notification {
#[serde(rename = "/cancel_session")]
Cancel(CancelSession),
#[serde(other, skip_serializing)]
#[serde(
other,
deserialize_with = "deserialize_unknown_topic",
skip_serializing
)]
UnknownTopic,
}
@@ -123,6 +127,15 @@ where
serde_json::from_str(&s).map_err(<D::Error as serde::de::Error>::custom)
}
// https://github.com/serde-rs/serde/issues/1714
fn deserialize_unknown_topic<'de, D>(deserializer: D) -> Result<(), D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_any(serde::de::IgnoredAny)?;
Ok(())
}
struct MessageHandler<C: ProjectInfoCache + Send + Sync + 'static> {
cache: Arc<C>,
cancellation_handler: Arc<CancellationHandler<()>>,
@@ -458,4 +471,30 @@ mod tests {
Ok(())
}
#[test]
fn parse_unknown_topic() -> anyhow::Result<()> {
let with_data = json!({
"type": "message",
"topic": "/doesnotexist",
"data": {
"payload": "ignored"
},
"extra_fields": "something"
})
.to_string();
let result: Notification = serde_json::from_str(&with_data)?;
assert_eq!(result, Notification::UnknownTopic);
let without_data = json!({
"type": "message",
"topic": "/doesnotexist",
"extra_fields": "something"
})
.to_string();
let result: Notification = serde_json::from_str(&without_data)?;
assert_eq!(result, Notification::UnknownTopic);
Ok(())
}
}

View File

@@ -3,9 +3,9 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use ed25519_dalek::SigningKey;
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use p256::ecdsa::SigningKey;
use p256::elliptic_curve::JwkEcKey;
use jose_jwk::jose_b64;
use rand::rngs::OsRng;
use tokio::net::{lookup_host, TcpStream};
use tracing::field::display;
@@ -354,9 +354,15 @@ impl PoolingBackend {
}
}
fn create_random_jwk() -> (SigningKey, JwkEcKey) {
let key = SigningKey::random(&mut OsRng);
let jwk = p256::PublicKey::from(key.verifying_key()).to_jwk();
fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
let key = SigningKey::generate(&mut OsRng);
let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
crv: jose_jwk::OkpCurves::Ed25519,
x: jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
d: None,
});
(key, jwk)
}

View File

@@ -16,17 +16,16 @@ use std::sync::Arc;
use std::task::{ready, Poll};
use std::time::Duration;
use ed25519_dalek::{Signature, Signer, SigningKey};
use futures::future::poll_fn;
use futures::Future;
use indexmap::IndexMap;
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
use p256::ecdsa::{Signature, SigningKey};
use parking_lot::RwLock;
use postgres_client::tls::NoTlsStream;
use postgres_client::types::ToSql;
use postgres_client::AsyncMessage;
use serde_json::value::RawValue;
use signature::Signer;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
@@ -42,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
pub(crate) const EXT_VERSION: &str = "0.1.2";
pub(crate) const EXT_VERSION: &str = "0.2.0";
pub(crate) const EXT_SCHEMA: &str = "auth";
#[derive(Clone)]
@@ -339,8 +338,8 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
let cap = jwt.capacity();
// we only need an empty header with the alg specified.
// base64url(r#"{"alg":"ES256"}"#) == "eyJhbGciOiJFUzI1NiJ9"
jwt.push_str("eyJhbGciOiJFUzI1NiJ9.");
// base64url(r#"{"alg":"EdDSA"}"#) == "eyJhbGciOiJFZERTQSJ9"
jwt.push_str("eyJhbGciOiJFZERTQSJ9.");
// encode the jwt payload in-place
base64::encode_config_buf(payload, base64::URL_SAFE_NO_PAD, &mut jwt);
@@ -366,14 +365,14 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use p256::ecdsa::SigningKey;
use ed25519_dalek::SigningKey;
use typed_json::json;
use super::resign_jwt;
#[test]
fn jwt_token_snapshot() {
let key = SigningKey::from_bytes(&[1; 32].into()).unwrap();
let key = SigningKey::from_bytes(&[1; 32]);
let data =
json!({"foo":"bar","jti":"foo\nbar","nested":{"jti":"tricky nesting"}}).to_string();
@@ -381,12 +380,17 @@ mod tests {
// To validate the JWT, copy the JWT string and paste it into https://jwt.io/.
// In the public-key box, paste the following jwk public key
// `{"kty":"EC","crv":"P-256","x":"b_A7lJJBzh2t1DUZ5pYOCoW0GmmgXDKBA6orzhWUyhY","y":"PE91OlW_AdxT9sCwx-7ni0DG_30lqW4igrmJzvccFEo"}`
// `{"kty":"OKP","crv":"Ed25519","x":"iojj3XQJ8ZX9UtstPLpdcspnCb8dlBIb83SIAbQPb1w"}`
// Note - jwt.io doesn't support EdDSA :(
// https://github.com/jsonwebtoken/jsonwebtoken.github.io/issues/509
// let pub_key = p256::ecdsa::VerifyingKey::from(&key);
// let pub_key = p256::PublicKey::from(pub_key);
// println!("{}", pub_key.to_jwk_string());
// let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
// crv: jose_jwk::OkpCurves::Ed25519,
// x: jose_jwk::jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
// d: None,
// });
// println!("{}", serde_json::to_string(&jwk).unwrap());
assert_eq!(jwt, "eyJhbGciOiJFUzI1NiJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.pYf0LxoJ8sDgpmsYOgrbNecOSipnPBEGwnZzB-JhW2cONrKlqRsgXwK8_cOsyolGy-hTTe8GXbWTl_UdpF5RyA");
assert_eq!(jwt, "eyJhbGciOiJFZERTQSJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.Cvyc2By33KI0f0obystwdy8PN111L3Sc9_Mr2CU3XshtSqSdxuRxNEZGbb_RvyJf2IzheC_s7aBZ-jLeQ9N0Bg");
}
}

View File

@@ -21,14 +21,13 @@ const KB: usize = 1024;
const MB: usize = 1024 * KB;
const GB: usize = 1024 * MB;
/// Use jemalloc, and configure it to sample allocations for profiles every 1 MB.
/// This mirrors the configuration in bin/safekeeper.rs.
/// Use jemalloc and enable profiling, to mirror bin/safekeeper.rs.
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
// Register benchmarks with Criterion.
criterion_group!(

View File

@@ -51,10 +51,12 @@ use utils::{
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
/// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
/// Configure jemalloc to profile heap allocations by sampling stack traces every 2 MB (1 << 21).
/// This adds roughly 3% overhead for allocations on average, which is acceptable considering
/// performance-sensitive code will avoid allocations as far as possible anyway.
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
const PID_FILE_NAME: &str = "safekeeper.pid";
const ID_FILE_NAME: &str = "safekeeper.id";

View File

@@ -1,7 +1,6 @@
use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
use reqwest::{Method, Url};
use serde::{de::DeserializeOwned, Serialize};
use std::str::FromStr;
pub struct Client {
base_url: Url,
@@ -31,16 +30,11 @@ impl Client {
RQ: Serialize + Sized,
RS: DeserializeOwned + Sized,
{
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let url = Url::from_str(&format!(
"http://{}:{}/{path}",
self.base_url.host_str().unwrap(),
self.base_url.port().unwrap()
))
.unwrap();
let mut builder = self.client.request(method, url);
let request_path = self
.base_url
.join(&path)
.expect("Failed to build request path");
let mut builder = self.client.request(method, request_path);
if let Some(body) = body {
builder = builder.json(&body)
}

View File

@@ -0,0 +1,4 @@
-- this sadly isn't a "true" revert of the migration, as the column is now at the end of the table.
-- But preserving order is not a trivial operation.
-- https://wiki.postgresql.org/wiki/Alter_column_position
ALTER TABLE safekeepers ADD active BOOLEAN NOT NULL DEFAULT false;

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers DROP active;

View File

@@ -124,7 +124,10 @@ impl ComputeHookTenant {
if let Some(shard_idx) = shard_idx {
sharded.shards.remove(shard_idx);
} else {
tracing::warn!("Shard not found while handling detach")
// This is a valid but niche case, where the tenant was previously attached
// as a Secondary location and then detached, so has no previously notified
// state.
tracing::info!("Shard not found while handling detach")
}
}
ComputeHookTenant::Unsharded(_) => {
@@ -761,7 +764,10 @@ impl ComputeHook {
let mut state_locked = self.state.lock().unwrap();
match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(_) => {
tracing::warn!("Compute hook tenant not found for detach");
// This is a valid but niche case, where the tenant was previously attached
// as a Secondary location and then detached, so has no previously notified
// state.
tracing::info!("Compute hook tenant not found for detach");
}
Entry::Occupied(mut e) => {
let sharded = e.get().is_sharded();

View File

@@ -690,7 +690,8 @@ async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError
};
let state = get_state(&req);
let nodes = state.service.node_list().await?;
let mut nodes = state.service.node_list().await?;
nodes.sort_by_key(|n| n.get_id());
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
json_response(StatusCode::OK, api_nodes)
@@ -1005,6 +1006,29 @@ async fn handle_tenant_shard_migrate(
)
}
async fn handle_tenant_shard_migrate_secondary(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
json_response(
StatusCode::OK,
service
.tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
.await?,
)
}
async fn handle_tenant_shard_cancel_reconcile(
service: Arc<Service>,
req: Request<Body>,
@@ -1855,6 +1879,16 @@ pub fn make_router(
RequestName("control_v1_tenant_migrate"),
)
})
.put(
"/control/v1/tenant/:tenant_shard_id/migrate_secondary",
|r| {
tenant_service_handler(
r,
handle_tenant_shard_migrate_secondary,
RequestName("control_v1_tenant_migrate_secondary"),
)
},
)
.put(
"/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
|r| {

View File

@@ -1258,7 +1258,6 @@ pub(crate) struct SafekeeperPersistence {
pub(crate) version: i64,
pub(crate) host: String,
pub(crate) port: i32,
pub(crate) active: bool,
pub(crate) http_port: i32,
pub(crate) availability_zone_id: String,
pub(crate) scheduling_policy: String,
@@ -1270,7 +1269,6 @@ impl SafekeeperPersistence {
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {
DatabaseError::Logical(format!("can't construct SkSchedulingPolicy: {e:?}"))
})?;
// omit the `active` flag on purpose: it is deprecated.
Ok(SafekeeperDescribeResponse {
id: NodeId(self.id as u64),
region_id: self.region_id.clone(),
@@ -1295,7 +1293,8 @@ pub(crate) struct SafekeeperUpsert {
pub(crate) version: i64,
pub(crate) host: String,
pub(crate) port: i32,
pub(crate) active: bool,
/// The active flag will not be stored in the database and will be ignored.
pub(crate) active: Option<bool>,
pub(crate) http_port: i32,
pub(crate) availability_zone_id: String,
}
@@ -1311,7 +1310,6 @@ impl SafekeeperUpsert {
version: self.version,
host: &self.host,
port: self.port,
active: self.active,
http_port: self.http_port,
availability_zone_id: &self.availability_zone_id,
// None means a wish to not update this column. We expose abilities to update it via other means.
@@ -1328,7 +1326,6 @@ struct InsertUpdateSafekeeper<'a> {
version: i64,
host: &'a str,
port: i32,
active: bool,
http_port: i32,
availability_zone_id: &'a str,
scheduling_policy: Option<&'a str>,

View File

@@ -696,6 +696,11 @@ impl Reconciler {
/// First we apply special case handling (e.g. for live migrations), and then a
/// general case reconciliation where we walk through the intent by pageserver
/// and call out to the pageserver to apply the desired state.
///
/// An Ok(()) result indicates that we successfully attached the tenant, but _not_ that
/// all locations for the tenant are in the expected state. When nodes that are to be detached
/// or configured as secondary are unavailable, we may return Ok(()) but leave the shard in a
/// state where it still requires later reconciliation.
pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
// Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
self.maybe_refresh_observed().await?;
@@ -784,10 +789,18 @@ impl Reconciler {
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
}
_ => {
// In all cases other than a matching observed configuration, we will
// reconcile this location.
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
changes.push((node.clone(), wanted_conf))
// Only try and configure secondary locations on nodes that are available. This
// allows the reconciler to "succeed" while some secondaries are offline (e.g. after
// a node failure, where the failed node will have a secondary intent)
if node.is_available() {
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
changes.push((node.clone(), wanted_conf))
} else {
tracing::info!(node_id=%node.get_id(), "Skipping configuration as secondary, node is unavailable");
self.observed
.locations
.insert(node.get_id(), ObservedStateLocation { conf: None });
}
}
}
}

View File

@@ -36,7 +36,6 @@ diesel::table! {
version -> Int8,
host -> Text,
port -> Int4,
active -> Bool,
http_port -> Int4,
availability_zone_id -> Text,
scheduling_policy -> Varchar,

View File

@@ -5055,6 +5055,69 @@ impl Service {
Ok(TenantShardMigrateResponse {})
}
pub(crate) async fn tenant_shard_migrate_secondary(
&self,
tenant_shard_id: TenantShardId,
migrate_req: TenantShardMigrateRequest,
) -> Result<TenantShardMigrateResponse, ApiError> {
let waiter = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let Some(node) = nodes.get(&migrate_req.node_id) else {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Node {} not found",
migrate_req.node_id
)));
};
if !node.is_available() {
// Warn but proceed: the caller may intend to manually adjust the placement of
// a shard even if the node is down, e.g. if intervening during an incident.
tracing::warn!("Migrating to unavailable node {node}");
}
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant shard not found").into(),
));
};
if shard.intent.get_secondary().len() == 1
&& shard.intent.get_secondary()[0] == migrate_req.node_id
{
tracing::info!(
"Migrating secondary to {node}: intent is unchanged {:?}",
shard.intent
);
} else if shard.intent.get_attached() == &Some(migrate_req.node_id) {
tracing::info!("Migrating secondary to {node}: already attached where we were asked to create a secondary");
} else {
let old_secondaries = shard.intent.get_secondary().clone();
for secondary in old_secondaries {
shard.intent.remove_secondary(scheduler, secondary);
}
shard.intent.push_secondary(scheduler, migrate_req.node_id);
shard.sequence = shard.sequence.next();
tracing::info!(
"Migrating secondary to {node}: new intent {:?}",
shard.intent
);
}
self.maybe_reconcile_shard(shard, nodes)
};
if let Some(waiter) = waiter {
waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
} else {
tracing::info!("Migration is a no-op");
}
Ok(TenantShardMigrateResponse {})
}
/// 'cancel' in this context means cancel any ongoing reconcile
pub(crate) async fn tenant_shard_cancel_reconcile(
&self,
@@ -5256,7 +5319,8 @@ impl Service {
expect_nodes.sort_by_key(|n| n.node_id);
nodes.sort_by_key(|n| n.node_id);
if nodes != expect_nodes {
// Errors relating to nodes are deferred so that we don't skip the shard checks below if we have a node error
let node_result = if nodes != expect_nodes {
tracing::error!("Consistency check failed on nodes.");
tracing::error!(
"Nodes in memory: {}",
@@ -5268,10 +5332,12 @@ impl Service {
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
Err(ApiError::InternalServerError(anyhow::anyhow!(
"Node consistency failure"
)));
}
)))
} else {
Ok(())
};
let mut persistent_shards = self.persistence.load_active_tenant_shards().await?;
persistent_shards
@@ -5281,6 +5347,7 @@ impl Service {
if persistent_shards != expect_shards {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
serde_json::to_string(&expect_shards)
@@ -5291,12 +5358,57 @@ impl Service {
serde_json::to_string(&persistent_shards)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
// The total dump log lines above are useful in testing but in the field grafana will
// usually just drop them because they're so large. So we also do some explicit logging
// of just the diffs.
let persistent_shards = persistent_shards
.into_iter()
.map(|tsp| (tsp.get_tenant_shard_id().unwrap(), tsp))
.collect::<HashMap<_, _>>();
let expect_shards = expect_shards
.into_iter()
.map(|tsp| (tsp.get_tenant_shard_id().unwrap(), tsp))
.collect::<HashMap<_, _>>();
for (tenant_shard_id, persistent_tsp) in &persistent_shards {
match expect_shards.get(tenant_shard_id) {
None => {
tracing::error!(
"Shard {} found in database but not in memory",
tenant_shard_id
);
}
Some(expect_tsp) => {
if expect_tsp != persistent_tsp {
tracing::error!(
"Shard {} is inconsistent. In memory: {}, database has: {}",
tenant_shard_id,
serde_json::to_string(expect_tsp).unwrap(),
serde_json::to_string(&persistent_tsp).unwrap()
);
}
}
}
}
// Having already logged any differences, log any shards that simply aren't present in the database
for (tenant_shard_id, memory_tsp) in &expect_shards {
if !persistent_shards.contains_key(tenant_shard_id) {
tracing::error!(
"Shard {} found in memory but not in database: {}",
tenant_shard_id,
serde_json::to_string(memory_tsp)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
}
}
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard consistency failure"
)));
}
Ok(())
node_result
}
/// For debug/support: a JSON dump of the [`Scheduler`]. Returns a response so that

View File

@@ -1122,10 +1122,15 @@ impl TenantShard {
let result = reconciler.reconcile().await;
// If we know we had a pending compute notification from some previous action, send a notification irrespective
// of whether the above reconcile() did any work
// of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
// sending a notification of a location that isn't really attached.
if result.is_ok() && must_notify {
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
reconciler.compute_notify().await.ok();
} else if must_notify {
// Carry this flag so that the reconciler's result will indicate that it still needs to retry
// the compute hook notification eventually.
reconciler.compute_notify_failure = true;
}
// Update result counter

View File

@@ -131,7 +131,6 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
*histogram("pageserver_smgr_query_seconds_global"),
*histogram("pageserver_layers_visited_per_read_global"),
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_operation_seconds"),

View File

@@ -84,9 +84,6 @@ page_cache_size=10
log.info("Checking layer access metrics ...")
layer_access_metric_names = [
"pageserver_layers_visited_per_read_global_sum",
"pageserver_layers_visited_per_read_global_count",
"pageserver_layers_visited_per_read_global_bucket",
"pageserver_layers_visited_per_vectored_read_global_sum",
"pageserver_layers_visited_per_vectored_read_global_count",
"pageserver_layers_visited_per_vectored_read_global_bucket",
@@ -97,12 +94,6 @@ page_cache_size=10
layer_access_metrics = metrics.query_all(name)
log.info(f"Got metrics: {layer_access_metrics}")
non_vectored_sum = metrics.query_one("pageserver_layers_visited_per_read_global_sum")
non_vectored_count = metrics.query_one("pageserver_layers_visited_per_read_global_count")
if non_vectored_count.value != 0:
non_vectored_average = non_vectored_sum.value / non_vectored_count.value
else:
non_vectored_average = 0
vectored_sum = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_sum")
vectored_count = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_count")
if vectored_count.value > 0:
@@ -113,11 +104,10 @@ page_cache_size=10
assert vectored_sum.value == 0
vectored_average = 0
log.info(f"{non_vectored_average=} {vectored_average=}")
log.info(f"{vectored_average=}")
# The upper bound for average number of layer visits below (8)
# was chosen empirically for this workload.
assert non_vectored_average < 8
assert vectored_average < 8

View File

@@ -141,11 +141,18 @@ def test_create_snapshot(
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(
initial_tenant_conf={
# Miniature layers to enable generating non-trivial layer map without writing lots of data
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
}
)
endpoint = env.endpoints.create_start("main")
pg_bin.run_capture(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
pg_bin.run_capture(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
pg_bin.run_capture(["pgbench", "--initialize", "--scale=1", endpoint.connstr()])
pg_bin.run_capture(["pgbench", "--time=30", "--progress=2", endpoint.connstr()])
pg_bin.run_capture(
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
@@ -157,7 +164,9 @@ def test_create_snapshot(
pageserver_http = env.pageserver.http_client()
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
pageserver_http.timeline_checkpoint(
tenant_id, timeline_id, wait_until_uploaded=True, force_image_layer_creation=True
)
env.endpoints.stop_all()
for sk in env.safekeepers:

View File

@@ -30,7 +30,7 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
],
)
n_resize = 10
scale = 100
scale = 20
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
@@ -46,17 +46,36 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
conn = endpoint.connect()
cur = conn.cursor()
def get_lfc_size() -> tuple[int, int]:
lfc_file_path = endpoint.lfc_path()
lfc_file_size = lfc_file_path.stat().st_size
res = subprocess.run(
["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True
)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}")
return (lfc_file_size, lfc_file_blocks)
# For as long as pgbench is running, twiddle the LFC size once a second.
# Note that we launch this immediately, already while the "pgbench -i"
# initialization step is still running. That's quite a different workload
# than the actual pgbench benchamark run, so this gives us coverage of both.
while thread.is_alive():
size = random.randint(1, 512)
# Vary the LFC size randomly within a range above what we will later
# decrease it to. This should ensure that the final size decrease
# is really doing something.
size = random.randint(192, 512)
cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'")
cur.execute("select pg_reload_conf()")
time.sleep(1)
thread.join()
# Before shrinking the cache, check that it really is large now
(lfc_file_size, lfc_file_blocks) = get_lfc_size()
assert int(lfc_file_blocks) > 128 * 1024
# At the end, set it at 100 MB, and perform a final check that the disk usage
# of the file is in that ballbark.
#
@@ -66,13 +85,7 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
cur.execute("select pg_reload_conf()")
nretries = 10
while True:
lfc_file_path = endpoint.lfc_path()
lfc_file_size = lfc_file_path.stat().st_size
res = subprocess.run(
["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True
)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}")
(lfc_file_size, lfc_file_blocks) = get_lfc_size()
assert lfc_file_size <= 512 * 1024 * 1024
if int(lfc_file_blocks) <= 128 * 1024 or nretries == 0:

View File

@@ -29,8 +29,8 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
cur = endpoint.connect().cursor()
stop = threading.Event()
n_rows = 100000
n_threads = 20
n_rows = 10000
n_threads = 5
n_updates_per_connection = 1000
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")

View File

@@ -1,10 +1,15 @@
from __future__ import annotations
import asyncio
import ssl
import asyncpg
import pytest
import websocket_tunnel
import websockets
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonProxy
from fixtures.port_distributor import PortDistributor
@pytest.mark.asyncio
@@ -196,3 +201,53 @@ async def test_websockets_pipelined(static_proxy: NeonProxy):
# close
await websocket.send(b"X\x00\x00\x00\x04")
await websocket.wait_closed()
@pytest.mark.asyncio
async def test_websockets_tunneled(static_proxy: NeonProxy, port_distributor: PortDistributor):
static_proxy.safe_psql("create user ws_auth with password 'ws' superuser")
user = "ws_auth"
password = "ws"
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(str(static_proxy.test_output_dir / "proxy.crt"))
# Launch a tunnel service so that we can speak the websockets protocol to
# the proxy
tunnel_port = port_distributor.get_port()
tunnel_server = await websocket_tunnel.start_server(
"127.0.0.1",
tunnel_port,
f"wss://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
ssl_context,
)
log.info(f"websockets tunnel listening for connections on port {tunnel_port}")
async with tunnel_server:
async def run_tunnel():
try:
async with tunnel_server:
await tunnel_server.serve_forever()
except Exception as e:
log.error(f"Error in tunnel task: {e}")
tunnel_task = asyncio.create_task(run_tunnel())
# Ok, the tunnel is now running. Check that we can connect to the proxy's
# websocket interface, through the tunnel
tunnel_connstring = f"postgres://{user}:{password}@127.0.0.1:{tunnel_port}/postgres"
log.info(f"connecting to {tunnel_connstring}")
conn = await asyncpg.connect(tunnel_connstring)
res = await conn.fetchval("SELECT 123")
assert res == 123
await conn.close()
log.info("Ran a query successfully through the tunnel")
tunnel_server.close()
try:
await tunnel_task
except asyncio.CancelledError:
pass

View File

@@ -822,6 +822,122 @@ def test_storage_controller_stuck_compute_hook(
env.storage_controller.consistency_check()
@run_only_on_default_postgres("postgres behavior is not relevant")
def test_storage_controller_compute_hook_retry(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address: ListenAddress,
):
"""
Test that when a reconciler can't do its compute hook notification, it will keep
trying until it succeeds.
Reproducer for https://github.com/neondatabase/cloud/issues/22612
"""
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
handle_params = {"status": 200}
notifications = []
def handler(request: Request):
status = handle_params["status"]
log.info(f"Notify request[{status}]: {request}")
notifications.append(request.json)
return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.create_tenant(tenant_id, placement_policy='{"Attached": 1}')
# Initial notification from tenant creation
assert len(notifications) == 1
expect: dict[str, list[dict[str, int]] | str | None | int] = {
"tenant_id": str(tenant_id),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
"preferred_az": DEFAULT_AZ_ID,
}
assert notifications[0] == expect
# Block notifications, and fail a node
handle_params["status"] = 423
env.pageservers[0].stop()
env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG)
env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS)
# Avoid waiting for heartbeats
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Offline"})
# Make reconciler run and fail: it should leave itself in a state where the shard will retry notification later,
# and we will check that that happens
notifications = []
try:
assert env.storage_controller.reconcile_all() == 1
except StorageControllerApiException as e:
assert "Control plane tenant busy" in str(e)
assert len(notifications) == 1
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is True
)
# Try reconciling again, it should try notifying again
notifications = []
try:
assert env.storage_controller.reconcile_all() == 1
except StorageControllerApiException as e:
assert "Control plane tenant busy" in str(e)
assert len(notifications) == 1
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is True
)
# The describe API should indicate that a notification is pending
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is True
)
# Unblock notifications: reconcile should work now
handle_params["status"] = 200
notifications = []
assert env.storage_controller.reconcile_all() == 1
assert len(notifications) == 1
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is False
)
# Reconciler should be idle now that it succeeded in its compute notification
notifications = []
assert env.storage_controller.reconcile_all() == 0
assert len(notifications) == 0
assert (
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"is_pending_compute_notification"
]
is False
)
@run_only_on_default_postgres("this test doesn't start an endpoint")
def test_storage_controller_compute_hook_revert(
httpserver: HTTPServer,
@@ -936,7 +1052,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
that just hits the endpoints to check that they don't bitrot.
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.num_pageservers = 3
env = neon_env_builder.init_start()
tenant_id = TenantId.generate()
@@ -961,7 +1077,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
"GET", f"{env.storage_controller_api}/debug/v1/scheduler"
)
# Two nodes, in a dict of node_id->node
assert len(response.json()["nodes"]) == 2
assert len(response.json()["nodes"]) == 3
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
assert all(v["may_schedule"] for v in response.json()["nodes"].values())
@@ -972,13 +1088,25 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
# Secondary migration API: superficial check that it migrates
secondary_dest = env.pageservers[2].id
env.storage_controller.request(
"PUT",
f"{env.storage_controller_api}/control/v1/tenant/{tenant_id}-0002/migrate_secondary",
headers=env.storage_controller.headers(TokenScope.ADMIN),
json={"tenant_shard_id": f"{tenant_id}-0002", "node_id": secondary_dest},
)
assert env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_secondary"] == [
secondary_dest
]
# Node unclean drop API
response = env.storage_controller.request(
"POST",
f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop",
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
assert len(env.storage_controller.node_list()) == 1
assert len(env.storage_controller.node_list()) == 2
# Tenant unclean drop API
response = env.storage_controller.request(
@@ -1696,7 +1824,13 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
"""
output_dir = neon_env_builder.test_output_dir
shard_count = 4
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.create_tenant(tenant_id, placement_policy='{"Attached":1}', shard_count=shard_count)
base_args = [env.neon_binpath / "storcon_cli", "--api", env.storage_controller_api]
def storcon_cli(args):
@@ -1725,7 +1859,7 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
# List nodes
node_lines = storcon_cli(["nodes"])
# Table header, footer, and one line of data
assert len(node_lines) == 5
assert len(node_lines) == 7
assert "localhost" in node_lines[3]
# Pause scheduling onto a node
@@ -1743,10 +1877,21 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"])
assert "Offline" in storcon_cli(["nodes"])[3]
# Restore node, verify status changes in CLI output
env.pageservers[0].start()
def is_online():
assert "Offline" not in storcon_cli(["nodes"])
wait_until(is_online)
# Let everything stabilize after node failure to avoid interfering with subsequent steps
env.storage_controller.reconcile_until_idle(timeout_secs=10)
# List tenants
tenant_lines = storcon_cli(["tenants"])
assert len(tenant_lines) == 5
assert str(env.initial_tenant) in tenant_lines[3]
assert str(tenant_id) in tenant_lines[3]
# Setting scheduling policies intentionally result in warnings, they're for rare use.
env.storage_controller.allowed_errors.extend(
@@ -1754,23 +1899,58 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
)
# Describe a tenant
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)])
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(tenant_id)])
assert len(tenant_lines) >= 3 + shard_count * 2
assert str(env.initial_tenant) in tenant_lines[0]
assert str(tenant_id) in tenant_lines[0]
# Migrate an attached location
def other_ps_id(current_ps_id):
return (
env.pageservers[0].id
if current_ps_id == env.pageservers[1].id
else env.pageservers[1].id
)
storcon_cli(
[
"tenant-shard-migrate",
"--tenant-shard-id",
f"{tenant_id}-0004",
"--node",
str(
other_ps_id(
env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_attached"]
)
),
]
)
# Migrate a secondary location
storcon_cli(
[
"tenant-shard-migrate-secondary",
"--tenant-shard-id",
f"{tenant_id}-0004",
"--node",
str(
other_ps_id(
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"node_secondary"
][0]
)
),
]
)
# Pause changes on a tenant
storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"])
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--scheduling", "stop"])
assert "Stop" in storcon_cli(["tenants"])[3]
# Cancel ongoing reconcile on a tenant
storcon_cli(
["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{env.initial_tenant}-0104"]
)
storcon_cli(["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{tenant_id}-0104"])
# Change a tenant's placement
storcon_cli(
["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"]
)
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--placement", "secondary"])
assert "Secondary" in storcon_cli(["tenants"])[3]
# Modify a tenant's config
@@ -1778,7 +1958,7 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
[
"patch-tenant-config",
"--tenant-id",
str(env.initial_tenant),
str(tenant_id),
"--config",
json.dumps({"pitr_interval": "1m"}),
]

154
test_runner/websocket_tunnel.py Executable file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
#
# This program helps to test the WebSocket tunneling in proxy. It listens for a TCP
# connection on a port, and when you connect to it, it opens a websocket connection,
# and forwards all the traffic to the websocket connection, wrapped in WebSocket binary
# frames.
#
# This is used in the test_proxy::test_websockets test, but it is handy for manual testing too.
#
# Usage for manual testing:
#
# ## Launch Posgres on port 3000:
# postgres -D data -p3000
#
# ## Launch proxy with WSS enabled:
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.neon.localtest.me'
# ./target/debug/proxy --wss 127.0.0.1:40433 --http 127.0.0.1:28080 --mgmt 127.0.0.1:9099 --proxy 127.0.0.1:4433 --tls-key server.key --tls-cert server.crt --auth-backend postgres
#
# ## Launch the tunnel:
#
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.neon.localtest.me"
#
# ## Now you can connect with psql:
# psql "postgresql://heikki@localhost:40433/postgres"
#
import argparse
import asyncio
import logging
import ssl
from ssl import Purpose
import websockets
from fixtures.log_helper import log
# Enable verbose logging of all the traffic
def enable_verbose_logging():
logger = logging.getLogger("websockets")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
async def start_server(tcp_listen_host, tcp_listen_port, ws_url, ctx):
server = await asyncio.start_server(
lambda r, w: handle_client(r, w, ws_url, ctx), tcp_listen_host, tcp_listen_port
)
return server
async def handle_tcp_to_websocket(tcp_reader, ws):
try:
while not tcp_reader.at_eof():
data = await tcp_reader.read(1024)
await ws.send(data)
except websockets.exceptions.ConnectionClosedError as e:
log.debug(f"connection closed: {e}")
except websockets.exceptions.ConnectionClosedOK:
log.debug("connection closed")
except Exception as e:
log.error(e)
async def handle_websocket_to_tcp(ws, tcp_writer):
try:
async for message in ws:
tcp_writer.write(message)
await tcp_writer.drain()
except websockets.exceptions.ConnectionClosedError as e:
log.debug(f"connection closed: {e}")
except websockets.exceptions.ConnectionClosedOK:
log.debug("connection closed")
except Exception as e:
log.error(e)
async def handle_client(tcp_reader, tcp_writer, ws_url: str, ctx: ssl.SSLContext):
try:
log.info("Received TCP connection. Connecting to websockets proxy.")
async with websockets.connect(ws_url, ssl=ctx) as ws:
try:
log.info("Connected to websockets proxy")
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(handle_tcp_to_websocket(tcp_reader, ws))
task2 = tg.create_task(handle_websocket_to_tcp(ws, tcp_writer))
done, pending = await asyncio.wait(
[task1, task2], return_when=asyncio.FIRST_COMPLETED
)
tcp_writer.close()
await ws.close()
except* Exception as ex:
log.error(ex.exceptions)
except Exception as e:
log.error(e)
async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--tcp-listen-addr",
default="localhost",
help="TCP addr to listen on",
)
parser.add_argument(
"--tcp-listen-port",
default="40444",
help="TCP port to listen on",
)
parser.add_argument(
"--ws-url",
default="wss://localhost/",
help="websocket URL to connect to. This determines the Host header sent to the server",
)
parser.add_argument(
"--ws-host",
default="127.0.0.1",
help="websockets host to connect to",
)
parser.add_argument(
"--ws-port",
type=int,
default=443,
help="websockets port to connect to",
)
parser.add_argument(
"--verbose",
action="store_true",
help="enable verbose logging",
)
args = parser.parse_args()
if args.verbose:
enable_verbose_logging()
ctx = ssl.create_default_context(Purpose.SERVER_AUTH)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
server = await start_server(args.tcp_listen_addr, args.tcp_listen_port, args.ws_url, ctx)
print(
f"Listening for connections at {args.tcp_listen_addr}:{args.tcp_listen_port}, forwarding them to {args.ws_host}:{args.ws_port}"
)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.2",
"7e3f3974bc8895938308f94d0e96879ffae638cd"
"9c9e9a78a93aebec2f6a2f54644442d35ffa245c"
],
"v16": [
"16.6",
"97f9fde349c6de6d573f5ce96db07eca60ce6185"
"f63b141cfb0c813725a6b2574049565bff643018"
],
"v15": [
"15.10",
"f262d631ad477a1819e84a183e5a7ef561830085"
"d3141e17a7155e3d07c8deba4a10c748a29ba1e6"
],
"v14": [
"14.15",
"c2f65b3201591e02ce45b66731392f98d3388e73"
"210a0ba3afd8134ea910b203f274b165bd4f05d7"
]
}