Compare commits

..

28 Commits

Author SHA1 Message Date
Vlad Lazar
b573979780 wip 2024-02-22 11:53:49 +00:00
Vlad Lazar
df6909302b test: add benchmark for tenants with large slrus
The new bencmark generates tenants with around 300 SLRU blocks
of 8KiB each. Both vectored get implementations are benchmarked.
For the vectored implementation validation is disabled.
2024-02-21 18:38:47 +00:00
Vlad Lazar
6331bc157f pagebench: accept connstring for basebackup bench
(cherry picked from commit 49f4feee403eca3ddce3a468c98872767c4fee9c)
2024-02-21 18:38:47 +00:00
Vlad Lazar
66a9479c6b pageserver: add config for vectored get validation 2024-02-21 18:38:47 +00:00
Vlad Lazar
2816c1611d pagserver/delta_layer: use vectored blob read
This commit does a few things:
* update the delta layer to use vectored blob reads
* thread the start Lsn for the read down to the delta layer
* thread the max size of the vectored read to the delta layer
2024-02-21 18:38:47 +00:00
Vlad Lazar
a237beaebc pageserver/image_layer: use vectored blob read
This commit updates the image layer to use the vectored
disk blob read. The max size config is also threaded to
the image layer.
2024-02-21 18:38:47 +00:00
Vlad Lazar
fde5bf485f pageserver/blob_io: add vectored blob read utilities 2024-02-21 18:38:43 +00:00
Vlad Lazar
d1809e9b7f pageserver/config: add a config for max size of vectored read 2024-02-21 18:31:33 +00:00
Vlad Lazar
aed4e84da2 pagserver/virtual_file: add read variant with byte count 2024-02-21 18:31:33 +00:00
Vlad Lazar
8987b11dbc pageserver: count SLRU blocks in basebackup 2024-02-21 14:36:27 +00:00
John Spray
84f027357d pageserver: adjust checkpoint distance for sharded tenants (#6852)
## Problem

Where the stripe size is the same order of magnitude as the checkpoint
distance (such as with default settings), tenant shards can easily pass
through `checkpoint_distance` bytes of LSN without actually ingesting
anything. This results in emitting many tiny L0 delta layers.

## Summary of changes

- Multiply checkpoint distance by shard count before comparing with LSN
distance. This is a heuristic and does not guarantee that we won't emit
small layers, but it fixes the issue for typical cases where the writes
in a (checkpoint_distance * shard_count) range of LSN bytes are somewhat
distributed across shards.
- Add a test that checks the size of layers after ingesting to a sharded
tenant; this fails before the fix.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2024-02-21 14:12:35 +00:00
Heikki Linnakangas
428d9fe69e tests: Make test_vm_bit_clear_on_heap_lock more robust again. (#6714)
When checking that the contents of the VM page in cache and in
pageserver match, ignore the LSN on the page. It could be different, if
the page was flushed from cache by a checkpoint, for example.

Here's one such failure from the CI that this hopefully fixes:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-6687/7847132649/index.html#suites/8545ca7650e609b2963d4035816a356b/5f9018db15ef4408/

In the passing, also remove some log.infos from the loop. I added them
while developing the tests, but now they're just noise.
2024-02-21 12:36:57 +00:00
Conrad Ludgate
e0af945f8f proxy: improve error classification (#6841)
## Problem

## Summary of changes

1. Classify further cplane API errors
2. add 'serviceratelimit' and make a few of the timeout errors return
that.
3. a few additional minor changes
2024-02-21 10:04:09 +00:00
John Spray
e7452d3756 storage controller: concurrency + deadlines during startup reconcile (#6823)
## Problem

During startup_reconcile we do a couple of potentially-slow things:
- Calling out to all nodes to read their locations
- Calling out to the cloud control plane to notify it of all tenants'
attached nodes

The read of node locations was not being done concurrently across nodes,
and neither operation was bounded by a well defined deadline.

## Summary of changes

- Refactor the async parts of startup_reconcile into separate functions
- Add concurrency and deadline to `scan_node_locations`
- Add deadline to `compute_notify_many`
- Run `cleanup_locations` in the background: there's no need for
startup_reconcile to wait for this to complete.
2024-02-21 09:54:25 +00:00
Vlad Lazar
5d6083bfc6 pageserver: add vectored get implementation (#6576)
This PR introduces a new vectored implementation of the read path.

The search is basically a DFS if you squint at it long enough.
LayerFringe tracks the next layers to visit and acts as our stack.
Vertices are tuples of (layer, keyspace, lsn range). Continuously
pop the top of the stack (most recent layer) and do all the reads
for one layer at once.

The search maintains a fringe (`LayerFringe`) which tracks all the
layers that intersect the current keyspace being searched. Continuously
pop the top of the fringe (layer with highest LSN) and get all the data
required from the layer in one go.

Said search is done on one timeline at a time. If data is still required for
some keys, then search the ancestor timeline.

Apart from the high level layer traversal, vectored variants have been
introduced for grabbing data from each layer type. They still suffer from
read amplification issues and that will be addressed in a different PR.

You might notice that in some places we duplicate the code for the
existing read path. All of that code will be removed when we switch
the non-vectored read path to proxy into the vectored read path.
In the meantime, we'll have to contend with the extra cruft for the sake
of testing and gentle releasing.
2024-02-21 09:49:46 +00:00
Alex Chi Z
3882f57001 neon_local: add flag to create test user and database (#6848)
This pull request adds two flags: `--update-catalog true` for `endpoint
create`, and `--create-test-user true` for `endpoint start`. The former
enables catalog updates for neon_superuser permission and many other
things, while the latter adds the user `test` and the database `neondb`
when setting up the database. A combination of these two flags will
create a Postgres similar to the production environment so that it would
be easier for us to test if extensions behave correctly when added to
Neon Postgres.

Example output:

```
❯ cargo neon endpoint start main --create-test-user true
    Finished dev [unoptimized + debuginfo] target(s) in 0.22s
     Running `target/debug/neon_local endpoint start main --create-test-user true`
Starting existing endpoint main...
Starting postgres node at 'postgresql://cloud_admin@127.0.0.1:55432/postgres'
Also at 'postgresql://user@127.0.0.1:55432/neondb'
```

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-02-21 00:20:42 +00:00
Alexander Bayandin
04190a1fea CI(test_runner): misc small changes (#6801)
## Problem

A set of small changes that are too small to open a separate for each.

A notable change is adding `pytest-repeat` plugin, which can help to
ensure that a flaky test is fixed by running such a test several times.

## Summary of changes
- Update Allure from 2.24.0 to 2.27.0
- Update Ruff from 0.1.11 to 0.2.2 (update `[tool.ruff]` section of
`pyproject.toml` for it)
- Install pytest-repeat plugin
2024-02-20 20:45:00 +00:00
Vlad Lazar
fcbe9fb184 test: adjust checkpoint distance in test_layer_map (#6842)
587cb705b8
changed the layer rolling logic to more closely obey the
`checkpoint_distance` config. Previously, this test was getting
layers significantly larger than the 8K it was asking for. Now the
payload in the layers is closer to 8K (which means more layers in
total).

Tweak the `checkpoint_distance` to get a number of layers more
reasonable for this test. Note that we still get more layers than
before (~8K vs ~5K).
2024-02-20 19:42:54 +00:00
Nikita Kalyanov
cbb599f353 Add /terminate API (#6745)
this is to speed up suspends, see
https://github.com/neondatabase/cloud/issues/10284

## Problem

## Summary of changes

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2024-02-20 19:42:36 +02:00
Christian Schwarz
e49602ecf5 feat(metrics): per-timeline metric for on-demand downloads, remove calls_started histogram (#6834)
refs #6737 

# Problem

Before this PR, on-demand downloads weren't  measured per tenant_id.
This makes root-cause analysis of latency spikes harder, requiring us to
resort to log scraping for

```
{neon_service="pageserver"} |= `downloading on-demand` |= `$tenant_id`
```

which can be expensive when zooming out in Grafana.

Context: https://neondb.slack.com/archives/C033RQ5SPDH/p1707809037868189

# Solution / Changes

- Remove the calls_started histogram
- I did the dilegence, there are only 2 dashboards using this histogram,
    and in fact only one uses it as a histogram, the other just as a
    a counter.
- [Link
1](8115b54d9f/neonprod/dashboards/hkXNF7oVz/dashboard-Z31XmM24k.yaml (L1454)):
`Pageserver Thrashing` dashboard, linked from playbook, will fix.
- [Link
2](8115b54d9f/neonprod/dashboards/CEllzAO4z/dashboard-sJqfNFL4k.yaml (L599)):
one of my personal dashboards, unused for a long time, already broken in
other ways, no need to fix.
- replace `pageserver_remote_timeline_client_calls_unfinished` gauge
with a counter pair
- Required `Clone`-able `IntCounterPair`, made the necessary changes in
the `libs/metrics` crate
-  fix tests to deal with the fallout

A subsequent PR will remove a timeline-scoped metric to compensate.

Note that we don't need additional global counters for the per-timeline
counters affected by this PR; we can use the `remote_storage` histogram
for those, which, conveniently, also include the secondary-mode
downloads, which aren't covered by the remote timeline client metrics
(should they?).
2024-02-20 17:52:23 +01:00
John Spray
eb02f4619e tests: add a shutdown log noise case to test_location_conf_churn (#6828)
This test does lots of shutdowns, and we may emit this layer warning during shutdown.

Saw a spurious failure here:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-6820/7964134049/index.html#/testresult/784218040583d963
2024-02-20 17:34:12 +01:00
Arthur Petukhovsky
9b8df2634f Fix active_timelines_count metric (#6839) 2024-02-20 15:55:51 +00:00
John Spray
d152d4f16f pageserver: fix treating all download errors as 'Other' (#6836)
## Problem

`download_retry` correctly uses a fatal check to avoid retrying forever
on cancellations and NotFound cases. However, `download_layer_file` was
casting all download errors to "Other" in order to attach an
anyhow::Context.

Noticed this issue in the context of secondary downloads, where requests
to download layers that might not exist are issued intentionally, and
this resulted in lots of error spam from retries that shouldn't have
happened.

## Summary of changes

- Remove the `.context()` so that the original DownloadError is visible
to backoff::retry
2024-02-20 13:40:46 +00:00
Christian Schwarz
b467d8067b fix(test_ondemand_download_timetravel): occasionally fails with WAL timeout during layer creation (#6818)
refs https://github.com/neondatabase/neon/issues/4112
amends https://github.com/neondatabase/neon/pull/6687

Since my last PR #6687 regarding this test, the type of flakiness that
has been observed has shifted to the beginning of the test, where we
create the layers:

```
timed out while waiting for remote_consistent_lsn to reach 0/411A5D8, was 0/411A5A0
```

[Example Allure
Report](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-6789/7932503173/index.html#/testresult/ddb877cfa4062f7d)

Analysis
--------

I suspect there was the following race condition:
- endpoints push out some tiny piece of WAL during their
  endpoints.stop_all()
- that WAL reaches the SK (it's just one SK according to logs)
- the SKs send it into the walreceiver connection
- the SK gets shut down
- the checkpoint is taken, with last_record_lsn = 0/411A5A0
- the PS's walreceiver_connection_handler processes the WAL that was
  sent into the connection by the SKs; this advances
  last_record_lsn to 0/411A5D8
- we get current_lsn = 0/411A5D8
- nothing flushes a layer

Changes
-------

There's no testing / debug interface to shut down / server all
walreceiver connections.
So, this PR restarts pageserver to achieve it.

Also, it lifts the "wait for image layer uploads" further up, so that
after this first
restart, the pageserver really does _nothing_ by itself, and so, the
origianl physical size mismatch issue quoted in #6687 should be fixed.
(My initial suspicion hasn't changed that it was due to the tiny chunk
of endpoint.stop_all() WAL being ingested after the second PS restart.)
2024-02-20 14:09:15 +01:00
Christian Schwarz
a48b23d777 fix(startup + remote_timeline_client): no-op deletion ops scheduled during startup (#6825)
Before this PR, if remote storage is configured, `load_layer_map`'s call
to `RemoteTimelineClient::schedule_layer_file_deletion` would schedule
an empty UploadOp::Delete for each timeline.

It's jsut CPU overhead, no actual interaction with deletion queue
on-disk state or S3, as far as I can tell.

However, it shows up in the "RemoteTimelineClient calls started
metrics", which I'm refining in an orthogonal PR.
2024-02-20 14:06:25 +01:00
Conrad Ludgate
21a86487a2 proxy: fix #6529 (#6807)
## Problem

`application_name` for HTTP is not being recorded

## Summary of changes

get `application_name` query param
2024-02-20 11:58:01 +01:00
Conrad Ludgate
686b3c79c8 http2 alpn (#6815)
## Problem

Proxy already supported HTTP2, but I expect no one is using it because
we don't advertise it in the TLS handshake.

## Summary of changes

#6335 without the websocket changes.
2024-02-20 10:44:46 +00:00
John Spray
02a8b7fbe0 storage controller: issue timeline create/delete calls concurrently (#6827)
## Problem

Timeline creation is meant to be very fast: it should only take
approximately on S3 PUT latency. When we have many shards in a tenant,
we should preserve that responsiveness.

## Summary of changes

- Issue create/delete pageserver API calls concurrently across all >0
shards
- During tenant deletion, delete shard zero last, separately, to avoid
confusing anything using GETs on the timeline.
- Return 201 instead of 200 on creations to make cloud control plane
happy

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-02-20 10:13:21 +00:00
65 changed files with 3713 additions and 1030 deletions

View File

@@ -76,8 +76,8 @@ runs:
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.24.0
ALLURE_ZIP_SHA256: 60b1d6ce65d9ef24b23cf9c2c19fd736a123487c38e54759f1ed1a7a77353c90
ALLURE_VERSION: 2.27.0
ALLURE_ZIP_SHA256: b071858fb2fa542c65d8f152c5c40d26267b2dfb74df1f1608a589ecca38e777
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this
- name: Acquire lock

View File

@@ -472,6 +472,7 @@ jobs:
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs
PAGESERVER_GET_VECTORED_IMPL: vectored
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540
@@ -1204,80 +1205,3 @@ jobs:
time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} s3://${BUCKET}/${PREFIX}/${FILENAME}
done
compute-node-image-merged-base:
needs: [ check-permissions, build-buildtools-image, tag ]
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
version: [ v14, v15, v16 ]
defaults:
run:
shell: sh -eu {0}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > ~/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
}
}
EOF
- name: Build merged image base
run: |
docker image build . -f Dockerfile.compute-node-simple -t neondatabase/tmp-compute-node-merged-base-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
--build-arg PG_VERSION=${{ matrix.version }} \
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}} \
--build-arg TAG=${{needs.build-buildtools-image.outputs.build-tools-tag}}
docker image push neondatabase/tmp-compute-node-merged-base-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
compute-node-image-merged:
needs: [ tag, compute-node-image-merged-base ]
runs-on: ubuntu-latest
defaults:
run:
shell: sh -eu {0}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
echo ~
cat <<-EOF > ~/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
}
}
EOF
- name: Build merged image
run: |
docker image build . -f Dockerfile.compute-node-merged -t neondatabase/tmp-compute-node-merged:${{needs.tag.outputs.build-tag}} \
--build-arg TAG=${{needs.tag.outputs.build-tag}}
docker image push neondatabase/tmp-compute-node-merged:${{needs.tag.outputs.build-tag}}

1
Cargo.lock generated
View File

@@ -3552,6 +3552,7 @@ dependencies = [
"enum-map",
"hex",
"humantime-serde",
"itertools",
"postgres_ffi",
"rand 0.8.5",
"serde",

View File

@@ -1,83 +0,0 @@
ARG TAG
FROM neondatabase/tmp-compute-node-merged-base-v14:$TAG as pg14
FROM neondatabase/tmp-compute-node-merged-base-v15:$TAG as pg15
FROM neondatabase/tmp-compute-node-merged-base-v16:$TAG as pg16
#########################################################################################
#
# Compile and run the Neon-specific `compute_ctl` binary
#
#########################################################################################
FROM neondatabase/build-tools:pinned AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cd compute_tools && cargo build --locked --profile release-line-debug-size-lto
#########################################################################################
#
# Final layer
# Put it all together into the final image
#
#########################################################################################
FROM debian:bullseye-slim
ARG TAG
# Add user postgres
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
echo "postgres:test_console_pass" | chpasswd && \
mkdir /var/db/postgres/compute && mkdir /var/db/postgres/specs && \
mkdir /var/db/postgres/pgbouncer && \
chown -R postgres:postgres /var/db/postgres && \
chmod 0750 /var/db/postgres/compute && \
chmod 0750 /var/db/postgres/pgbouncer && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig && \
# create folder for file cache
mkdir -p -m 777 /neon/cache
COPY --from=pg14 --chown=postgres /usr/local/pgsql /usr/local/pgsql-v14
COPY --from=pg15 --chown=postgres /usr/local/pgsql /usr/local/pgsql-v15
COPY --from=pg16 --chown=postgres /usr/local/pgsql /usr/local/pgsql-v16
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)
# liblz4-1 for lz4
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
# libzstd1 for zstd
# libboost* for rdkit
# ca-certificates for communicating with s3 by compute_ctl
RUN apt update && \
apt install --no-install-recommends -y \
gdb \
libicu67 \
liblz4-1 \
libreadline8 \
libboost-iostreams1.74.0 \
libboost-regex1.74.0 \
libboost-serialization1.74.0 \
libboost-system1.74.0 \
libossp-uuid16 \
libgeos-c1v5 \
libgdal28 \
libproj19 \
libprotobuf-c1 \
libsfcgal1 \
libxml2 \
libxslt1.1 \
libzstd1 \
libcurl4-openssl-dev \
locales \
procps \
ca-certificates && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
ENV LANG en_US.utf8
USER postgres
ENTRYPOINT ["/usr/local/bin/compute_ctl"]

View File

@@ -1,159 +0,0 @@
ARG PG_VERSION
ARG REPOSITORY=neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG BUILD_TAG
#########################################################################################
#
# Layer "build-deps"
#
#########################################################################################
FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd
#########################################################################################
#
# Layer "pg-build"
# Build Postgres from the neon postgres repository.
#
#########################################################################################
FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION} postgres
RUN cd postgres && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION}" != "v14" ]; then \
# zstd is available only from PG15
export CONFIGURE_CMD="${CONFIGURE_CMD} --with-zstd"; \
fi && \
eval $CONFIGURE_CMD && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
# Enable some of contrib extensions
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/moddatetime.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_stat_statements.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control && \
# We need to grant EXECUTE on pg_stat_statements_reset() to neon_superuser.
# In vanilla postgres this function is limited to Postgres role superuser.
# In neon we have neon_superuser role that is not a superuser but replaces superuser in some cases.
# We could add the additional grant statements to the postgres repository but it would be hard to maintain,
# whenever we need to pick up a new postgres version and we want to limit the changes in our postgres fork,
# so we do it here.
old_list="pg_stat_statements--1.0--1.1.sql pg_stat_statements--1.1--1.2.sql pg_stat_statements--1.2--1.3.sql pg_stat_statements--1.3--1.4.sql pg_stat_statements--1.4--1.5.sql pg_stat_statements--1.4.sql pg_stat_statements--1.5--1.6.sql"; \
# the first loop is for pg_stat_statement extension version <= 1.6
for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
if echo "$old_list" | grep -q -F "$filename"; then \
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO neon_superuser;' >> $file; \
fi; \
done; \
# the second loop is for pg_stat_statement extension versions >= 1.7,
# where pg_stat_statement_reset() got 3 additional arguments
for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
if ! echo "$old_list" | grep -q -F "$filename"; then \
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO neon_superuser;' >> $file; \
fi; \
done
#########################################################################################
#
# Layer "neon-pg-ext-build"
# compile neon extensions
#
#########################################################################################
FROM build-deps AS neon-pg-ext-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_test_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_rmgr \
-s install && \
case "${PG_VERSION}" in \
"v14" | "v15") \
;; \
"v16") \
echo "Skipping HNSW for PostgreSQL 16" && exit 0 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/hnsw \
-s install
#########################################################################################
#
# Compile and run the Neon-specific `compute_ctl` binary
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cd compute_tools && cargo build --locked --profile release-line-debug-size-lto
#########################################################################################
#
# Clean up postgres folder before inclusion
#
#########################################################################################
FROM neon-pg-ext-build AS postgres-cleanup-layer
COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
# Remove binaries from /bin/ that we won't use (or would manually copy & install otherwise)
RUN cd /usr/local/pgsql/bin && rm ecpg
# Remove headers that we won't need anymore - we've completed installation of all extensions
RUN rm -r /usr/local/pgsql/include
# Remove static postgresql libraries - all compilation is finished, so we
# can now remove these files - they must be included in other binaries by now
# if they were to be used by other libraries.
RUN rm /usr/local/pgsql/lib/lib*.a
#########################################################################################
#
# Final layer
# Put it all together into the final image
#
#########################################################################################
FROM debian:bullseye-slim
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local/pgsql

View File

@@ -45,7 +45,6 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use nix::sys::signal::{kill, Signal};
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info};
@@ -53,7 +52,9 @@ use url::Url;
use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec, PG_PID, SYNC_SAFEKEEPERS_PID};
use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::http::api::launch_http_server;
@@ -394,6 +395,15 @@ fn main() -> Result<()> {
info!("synced safekeepers at lsn {lsn}");
}
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::TerminationPending {
state.status = ComputeStatus::Terminated;
compute.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = true
}
drop(state);
if let Err(err) = compute.check_for_core_dumps() {
error!("error while checking for core dumps: {err:?}");
}
@@ -523,16 +533,7 @@ fn cli() -> clap::Command {
/// wait for termination which would be easy then.
fn handle_exit_signal(sig: i32) {
info!("received {sig} termination signal");
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
if ss_pid != 0 {
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
kill(ss_pid, Signal::SIGTERM).ok();
}
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
kill(pg_pid, Signal::SIGTERM).ok();
}
forward_termination_signal();
exit(1);
}

View File

@@ -28,6 +28,8 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use nix::sys::signal::{kill, Signal};
use remote_storage::{DownloadError, RemotePath};
use crate::checker::create_availability_check_data;
@@ -1322,3 +1324,17 @@ LIMIT 100",
Ok(remote_ext_metrics)
}
}
pub fn forward_termination_signal() {
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
if ss_pid != 0 {
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
kill(ss_pid, Signal::SIGTERM).ok();
}
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
// use 'immediate' shutdown (SIGQUIT): https://www.postgresql.org/docs/current/server-shutdown.html
kill(pg_pid, Signal::SIGQUIT).ok();
}
}

View File

@@ -5,6 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
@@ -123,6 +124,17 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
(&Method::POST, "/terminate") => {
info!("serving /terminate POST request");
match handle_terminate_request(compute).await {
Ok(()) => Response::new(Body::empty()),
Err((msg, code)) => {
error!("error handling /terminate request: {msg}");
render_json_error(&msg, code)
}
}
}
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
@@ -297,6 +309,49 @@ fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
.unwrap()
}
async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
{
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated {
return Ok(());
}
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for termination request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.status = ComputeStatus::TerminationPending;
compute.state_changed.notify_all();
drop(state);
}
forward_termination_signal();
info!("sent signal and notified waiters");
// Spawn a blocking thread to wait for compute to become Terminated.
// This is needed to do not block the main pool of workers and
// be able to serve other requests while some particular request
// is waiting for compute to finish configuration.
let c = compute.clone();
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Terminated {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Terminated, current status: {:?}",
state.status
);
}
Ok(())
})
.await
.unwrap()?;
info!("terminated Postgres");
Ok(())
}
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(port: u16, state: Arc<ComputeNode>) {

View File

@@ -168,6 +168,29 @@ paths:
schema:
$ref: "#/components/schemas/GenericError"
/terminate:
post:
tags:
- Terminate
summary: Terminate Postgres and wait for it to exit
description: ""
operationId: terminate
responses:
200:
description: Result
412:
description: "wrong state"
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
500:
description: "Unexpected error"
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:
JWT:

View File

@@ -114,7 +114,10 @@ async fn handle_tenant_create(
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
json_response(StatusCode::OK, service.tenant_create(create_req).await?)
json_response(
StatusCode::CREATED,
service.tenant_create(create_req).await?,
)
}
// For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
@@ -196,7 +199,7 @@ async fn handle_tenant_timeline_create(
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
json_response(
StatusCode::OK,
StatusCode::CREATED,
service
.tenant_timeline_create(tenant_id, create_req)
.await?,

View File

@@ -14,18 +14,18 @@ use control_plane::attachment_service::{
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use diesel::result::DatabaseErrorKind;
use futures::StreamExt;
use futures::{stream::FuturesUnordered, StreamExt};
use hyper::StatusCode;
use pageserver_api::{
control_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
ValidateResponse, ValidateResponseTenant,
},
models,
models::{
LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters,
TenantConfig, TenantCreateRequest, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
},
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
};
@@ -167,84 +167,53 @@ impl Service {
/// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
/// view of the world, and determine which pageservers are responsive.
#[instrument(skip_all)]
async fn startup_reconcile(&self) {
async fn startup_reconcile(self: &Arc<Service>) {
// For all tenant shards, a vector of observed states on nodes (where None means
// indeterminate, same as in [`ObservedStateLocation`])
let mut observed = HashMap::new();
let mut nodes_online = HashSet::new();
// TODO: issue these requests concurrently
{
let nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
for node in nodes.values() {
let http_client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(5))
.build()
.expect("Failed to construct HTTP client");
let client = mgmt_api::Client::from_client(
http_client,
node.base_url(),
self.config.jwt_token.as_deref(),
);
// Startup reconciliation does I/O to other services: whether they
// are responsive or not, we should aim to finish within our deadline, because:
// - If we don't, a k8s readiness hook watching /ready will kill us.
// - While we're waiting for startup reconciliation, we are not fully
// available for end user operations like creating/deleting tenants and timelines.
//
// We set multiple deadlines to break up the time available between the phases of work: this is
// arbitrary, but avoids a situation where the first phase could burn our entire timeout period.
let start_at = Instant::now();
let node_scan_deadline = start_at
.checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
.expect("Reconcile timeout is a modest constant");
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
}
}
let compute_notify_deadline = start_at
.checked_add((STARTUP_RECONCILE_TIMEOUT / 4) * 3)
.expect("Reconcile timeout is a modest constant");
let list_response = backoff::retry(
|| client.list_location_config(),
is_fatal,
1,
5,
"Location config listing",
&self.cancel,
)
.await;
let Some(list_response) = list_response else {
tracing::info!("Shutdown during startup_reconcile");
return;
};
// Accumulate a list of any tenant locations that ought to be detached
let mut cleanup = Vec::new();
tracing::info!("Scanning shards on node {}...", node.id);
match list_response {
Err(e) => {
tracing::warn!("Could not contact pageserver {} ({e})", node.id);
// TODO: be more tolerant, do some retries, in case
// pageserver is being restarted at the same time as we are
}
Ok(listing) => {
tracing::info!(
"Received {} shard statuses from pageserver {}, setting it to Active",
listing.tenant_shards.len(),
node.id
);
nodes_online.insert(node.id);
let node_listings = self.scan_node_locations(node_scan_deadline).await;
for (node_id, list_response) in node_listings {
let tenant_shards = list_response.tenant_shards;
tracing::info!(
"Received {} shard statuses from pageserver {}, setting it to Active",
tenant_shards.len(),
node_id
);
nodes_online.insert(node_id);
for (tenant_shard_id, conf_opt) in listing.tenant_shards {
observed.insert(tenant_shard_id, (node.id, conf_opt));
}
}
}
for (tenant_shard_id, conf_opt) in tenant_shards {
observed.insert(tenant_shard_id, (node_id, conf_opt));
}
}
let mut cleanup = Vec::new();
// List of tenants for which we will attempt to notify compute of their location at startup
let mut compute_notifications = Vec::new();
// Populate intent and observed states for all tenants, based on reported state on pageservers
let (shard_count, nodes) = {
let shard_count = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
@@ -288,18 +257,171 @@ impl Service {
}
}
(tenants.len(), nodes.clone())
tenants.len()
};
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
// generation_pageserver in the database.
// Clean up any tenants that were found on pageservers but are not known to us.
// Emit compute hook notifications for all tenants which are already stably attached. Other tenants
// will emit compute hook notifications when they reconcile.
//
// Ordering: we must complete these notification attempts before doing any other reconciliation for the
// tenants named here, because otherwise our calls to notify() might race with more recent values
// generated by reconciliation.
let notify_failures = self
.compute_notify_many(compute_notifications, compute_notify_deadline)
.await;
// Compute notify is fallible. If it fails here, do not delay overall startup: set the
// flag on these shards that they have a pending notification.
// Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
{
let mut locked = self.inner.write().unwrap();
for tenant_shard_id in notify_failures.into_iter() {
if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
shard.pending_compute_notification = true;
}
}
}
// Finally, now that the service is up and running, launch reconcile operations for any tenants
// which require it: under normal circumstances this should only include tenants that were in some
// transient state before we restarted, or any tenants whose compute hooks failed above.
let reconcile_tasks = self.reconcile_all();
// We will not wait for these reconciliation tasks to run here: we're now done with startup and
// normal operations may proceed.
// Clean up any tenants that were found on pageservers but are not known to us. Do this in the
// background because it does not need to complete in order to proceed with other work.
if !cleanup.is_empty() {
tracing::info!("Cleaning up {} locations in the background", cleanup.len());
tokio::task::spawn({
let cleanup_self = self.clone();
async move { cleanup_self.cleanup_locations(cleanup).await }
});
}
tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
}
/// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
///
/// The result includes only nodes which responded within the deadline
async fn scan_node_locations(
&self,
deadline: Instant,
) -> HashMap<NodeId, LocationConfigListResponse> {
let nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let mut node_results = HashMap::new();
let mut node_list_futs = FuturesUnordered::new();
for node in nodes.values() {
node_list_futs.push({
async move {
let http_client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(5))
.build()
.expect("Failed to construct HTTP client");
let client = mgmt_api::Client::from_client(
http_client,
node.base_url(),
self.config.jwt_token.as_deref(),
);
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
}
}
tracing::info!("Scanning shards on node {}...", node.id);
let description = format!("List locations on {}", node.id);
let response = backoff::retry(
|| client.list_location_config(),
is_fatal,
1,
5,
&description,
&self.cancel,
)
.await;
(node.id, response)
}
});
}
loop {
let (node_id, result) = tokio::select! {
next = node_list_futs.next() => {
match next {
Some(result) => result,
None =>{
// We got results for all our nodes
break;
}
}
},
_ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
// Give up waiting for anyone who hasn't responded: we will yield the results that we have
tracing::info!("Reached deadline while waiting for nodes to respond to location listing requests");
break;
}
};
let Some(list_response) = result else {
tracing::info!("Shutdown during startup_reconcile");
break;
};
match list_response {
Err(e) => {
tracing::warn!("Could not scan node {} ({e})", node_id);
}
Ok(listing) => {
node_results.insert(node_id, listing);
}
}
}
node_results
}
/// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
///
/// This is safe to run in the background, because if we don't have this TenantShardId in our map of
/// tenants, then it is probably something incompletely deleted before: we will not fight with any
/// other task trying to attach it.
#[instrument(skip_all)]
async fn cleanup_locations(&self, cleanup: Vec<(TenantShardId, NodeId)>) {
let nodes = self.inner.read().unwrap().nodes.clone();
for (tenant_shard_id, node_id) in cleanup {
// A node reported a tenant_shard_id which is unknown to us: detach it.
let node = nodes
.get(&node_id)
.expect("Always exists: only known nodes are scanned");
let Some(node) = nodes.get(&node_id) else {
// This is legitimate; we run in the background and [`Self::startup_reconcile`] might have identified
// a location to clean up on a node that has since been removed.
tracing::info!(
"Not cleaning up location {node_id}/{tenant_shard_id}: node not found"
);
continue;
};
if self.cancel.is_cancelled() {
break;
}
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
match client
@@ -332,21 +454,24 @@ impl Service {
}
}
}
}
// Emit compute hook notifications for all tenants which are already stably attached. Other tenants
// will emit compute hook notifications when they reconcile.
//
// Ordering: we must complete these notification attempts before doing any other reconciliation for the
// tenants named here, because otherwise our calls to notify() might race with more recent values
// generated by reconciliation.
// Compute notify is fallible. If it fails here, do not delay overall startup: set the
// flag on these shards that they have a pending notification.
/// Used during [`Self::startup_reconcile`]: issue many concurrent compute notifications.
///
/// Returns a set of any shards for which notifications where not acked within the deadline.
async fn compute_notify_many(
&self,
notifications: Vec<(TenantShardId, NodeId)>,
deadline: Instant,
) -> HashSet<TenantShardId> {
let compute_hook = self.inner.read().unwrap().compute_hook.clone();
let attempt_shards = notifications.iter().map(|i| i.0).collect::<HashSet<_>>();
let mut success_shards = HashSet::new();
// Construct an async stream of futures to invoke the compute notify function: we do this
// in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
let stream = futures::stream::iter(compute_notifications.into_iter())
let mut stream = futures::stream::iter(notifications.into_iter())
.map(|(tenant_shard_id, node_id)| {
let compute_hook = compute_hook.clone();
let cancel = self.cancel.clone();
@@ -357,33 +482,43 @@ impl Service {
node_id=%node_id,
"Failed to notify compute on startup for shard: {e}"
);
Some(tenant_shard_id)
} else {
None
} else {
Some(tenant_shard_id)
}
}
})
.buffered(compute_hook::API_CONCURRENCY);
let notify_results = stream.collect::<Vec<_>>().await;
// Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
{
let mut locked = self.inner.write().unwrap();
for tenant_shard_id in notify_results.into_iter().flatten() {
if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
shard.pending_compute_notification = true;
loop {
tokio::select! {
next = stream.next() => {
match next {
Some(Some(success_shard)) => {
// A notification succeeded
success_shards.insert(success_shard);
},
Some(None) => {
// A notification that failed
},
None => {
tracing::info!("Successfully sent all compute notifications");
break;
}
}
},
_ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
// Give up sending any that didn't succeed yet
tracing::info!("Reached deadline while sending compute notifications");
break;
}
}
};
}
// Finally, now that the service is up and running, launch reconcile operations for any tenants
// which require it: under normal circumstances this should only include tenants that were in some
// transient state before we restarted, or any tenants whose compute hooks failed above.
let reconcile_tasks = self.reconcile_all();
// We will not wait for these reconciliation tasks to run here: we're now done with startup and
// normal operations may proceed.
tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
attempt_shards
.difference(&success_shards)
.cloned()
.collect()
}
/// Long running background task that periodically wakes up and looks for shards that need
@@ -1287,8 +1422,6 @@ impl Service {
tenant_id: TenantId,
mut create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
let mut timeline_info = None;
tracing::info!(
"Creating timeline {}/{}",
tenant_id,
@@ -1299,7 +1432,7 @@ impl Service {
// TODO: refuse to do this if shard splitting is in progress
// (https://github.com/neondatabase/neon/issues/6676)
let targets = {
let mut targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
@@ -1323,21 +1456,24 @@ impl Service {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
));
}
for (tenant_shard_id, node) in targets {
// TODO: issue shard timeline creates in parallel, once the 0th is done.
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
};
let shard_zero = targets.remove(0);
async fn create_one(
tenant_shard_id: TenantShardId,
node: Node,
jwt: Option<String>,
create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
tracing::info!(
"Creating timeline on shard {}/{}, attached to node {}",
tenant_shard_id,
create_req.new_timeline_id,
node.id
);
let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
let shard_timeline_info = client
client
.timeline_create(tenant_shard_id, &create_req)
.await
.map_err(|e| match e {
@@ -1350,23 +1486,66 @@ impl Service {
ApiError::InternalServerError(anyhow::anyhow!(msg))
}
_ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
})?;
if timeline_info.is_none() {
// If the caller specified an ancestor but no ancestor LSN, we are responsible for
// propagating the LSN chosen by the first shard to the other shards: it is important
// that all shards end up with the same ancestor_start_lsn.
if create_req.ancestor_timeline_id.is_some()
&& create_req.ancestor_start_lsn.is_none()
{
create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn;
}
// We will return the TimelineInfo from the first shard
timeline_info = Some(shard_timeline_info);
}
})
}
Ok(timeline_info.expect("targets cannot be empty"))
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
// use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
// that will get the first creation request, and propagate the LSN to all the >0 shards.
let timeline_info = create_one(
shard_zero.0,
shard_zero.1,
self.config.jwt_token.clone(),
create_req.clone(),
)
.await?;
// Propagate the LSN that shard zero picked, if caller didn't provide one
if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() {
create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
}
// Create timeline on remaining shards with number >0
if !targets.is_empty() {
// If we had multiple shards, issue requests for the remainder now.
let jwt = self.config.jwt_token.clone();
self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
let create_req = create_req.clone();
Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
})
.await?;
}
Ok(timeline_info)
}
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
///
/// On success, the returned vector contains exactly the same number of elements as the input `locations`.
async fn tenant_for_shards<F, R>(
&self,
locations: Vec<(TenantShardId, Node)>,
mut req_fn: F,
) -> Result<Vec<R>, ApiError>
where
F: FnMut(
TenantShardId,
Node,
)
-> std::pin::Pin<Box<dyn futures::Future<Output = Result<R, ApiError>> + Send>>,
{
let mut futs = FuturesUnordered::new();
let mut results = Vec::with_capacity(locations.len());
for (tenant_shard_id, node) in locations {
futs.push(req_fn(tenant_shard_id, node));
}
while let Some(r) = futs.next().await {
results.push(r?);
}
Ok(results)
}
pub(crate) async fn tenant_timeline_delete(
@@ -1380,7 +1559,7 @@ impl Service {
// TODO: refuse to do this if shard splitting is in progress
// (https://github.com/neondatabase/neon/issues/6676)
let targets = {
let mut targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
@@ -1405,12 +1584,14 @@ impl Service {
anyhow::anyhow!("Tenant not found").into(),
));
}
let shard_zero = targets.remove(0);
// TODO: call into shards concurrently
let mut any_pending = false;
for (tenant_shard_id, node) in targets {
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
async fn delete_one(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
jwt: Option<String>,
) -> Result<StatusCode, ApiError> {
tracing::info!(
"Deleting timeline on shard {}/{}, attached to node {}",
tenant_shard_id,
@@ -1418,7 +1599,8 @@ impl Service {
node.id
);
let status = client
let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
client
.timeline_delete(tenant_shard_id, timeline_id)
.await
.map_err(|e| {
@@ -1426,18 +1608,36 @@ impl Service {
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
node.id
))
})?;
if status == StatusCode::ACCEPTED {
any_pending = true;
}
})
}
if any_pending {
Ok(StatusCode::ACCEPTED)
} else {
Ok(StatusCode::NOT_FOUND)
let statuses = self
.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
Box::pin(delete_one(
tenant_shard_id,
timeline_id,
node,
self.config.jwt_token.clone(),
))
})
.await?;
// If any shards >0 haven't finished deletion yet, don't start deletion on shard zero
if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) {
return Ok(StatusCode::ACCEPTED);
}
// Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
// to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
let shard_zero_status = delete_one(
shard_zero.0,
timeline_id,
shard_zero.1,
self.config.jwt_token.clone(),
)
.await?;
Ok(shard_zero_status)
}
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this

View File

@@ -652,6 +652,10 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
let name = import_match
.get_one::<String>("node-name")
.ok_or_else(|| anyhow!("No node name provided"))?;
let update_catalog = import_match
.get_one::<bool>("update-catalog")
.cloned()
.unwrap_or_default();
// Parse base inputs
let base_tarfile = import_match
@@ -694,6 +698,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
None,
pg_version,
ComputeMode::Primary,
!update_catalog,
)?;
println!("Done");
}
@@ -831,6 +836,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.get_one::<String>("endpoint_id")
.map(String::to_string)
.unwrap_or_else(|| format!("ep-{branch_name}"));
let update_catalog = sub_args
.get_one::<bool>("update-catalog")
.cloned()
.unwrap_or_default();
let lsn = sub_args
.get_one::<String>("lsn")
@@ -880,6 +889,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
http_port,
pg_version,
mode,
!update_catalog,
)?;
}
"start" => {
@@ -918,6 +928,11 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.get(endpoint_id.as_str())
.ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
let create_test_user = sub_args
.get_one::<bool>("create-test-user")
.cloned()
.unwrap_or_default();
cplane.check_conflicting_endpoints(
endpoint.mode,
endpoint.tenant_id,
@@ -972,6 +987,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
pageservers,
remote_ext_config,
stripe_size.0 as usize,
create_test_user,
)
.await?;
}
@@ -1457,6 +1473,18 @@ fn cli() -> Command {
.required(false)
.default_value("1");
let update_catalog = Arg::new("update-catalog")
.value_parser(value_parser!(bool))
.long("update-catalog")
.help("If set, will set up the catalog for neon_superuser")
.required(false);
let create_test_user = Arg::new("create-test-user")
.value_parser(value_parser!(bool))
.long("create-test-user")
.help("If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`")
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1517,6 +1545,7 @@ fn cli() -> Command {
.arg(Arg::new("end-lsn").long("end-lsn")
.help("Lsn the basebackup ends at"))
.arg(pg_version_arg.clone())
.arg(update_catalog.clone())
)
).subcommand(
Command::new("tenant")
@@ -1630,6 +1659,7 @@ fn cli() -> Command {
.required(false))
.arg(pg_version_arg.clone())
.arg(hot_standby_arg.clone())
.arg(update_catalog)
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
@@ -1637,6 +1667,7 @@ fn cli() -> Command {
.arg(endpoint_pageserver_id_arg.clone())
.arg(safekeepers_arg)
.arg(remote_ext_config_args)
.arg(create_test_user)
)
.subcommand(Command::new("reconfigure")
.about("Reconfigure the endpoint")

View File

@@ -41,11 +41,15 @@ use std::net::SocketAddr;
use std::net::TcpStream;
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::spec::Database;
use compute_api::spec::PgIdent;
use compute_api::spec::RemoteExtSpec;
use compute_api::spec::Role;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use serde::{Deserialize, Serialize};
@@ -122,6 +126,7 @@ impl ComputeControlPlane {
http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
skip_pg_catalog_updates: bool,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
@@ -140,7 +145,7 @@ impl ComputeControlPlane {
// before and after start are the same. So, skip catalog updates,
// with this we basically test a case of waking up an idle compute, where
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates: true,
skip_pg_catalog_updates,
features: vec![],
});
@@ -155,7 +160,7 @@ impl ComputeControlPlane {
http_port,
pg_port,
pg_version,
skip_pg_catalog_updates: true,
skip_pg_catalog_updates,
features: vec![],
})?,
)?;
@@ -500,6 +505,7 @@ impl Endpoint {
pageservers: Vec<(Host, u16)>,
remote_ext_config: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
) -> Result<()> {
if self.status() == EndpointStatus::Running {
anyhow::bail!("The endpoint is already running");
@@ -551,8 +557,26 @@ impl Endpoint {
cluster_id: None, // project ID: not used
name: None, // project name: not used
state: None,
roles: vec![],
databases: vec![],
roles: if create_test_user {
vec![Role {
name: PgIdent::from_str("test").unwrap(),
encrypted_password: None,
options: None,
}]
} else {
Vec::new()
},
databases: if create_test_user {
vec![Database {
name: PgIdent::from_str("neondb").unwrap(),
owner: PgIdent::from_str("test").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
}]
} else {
Vec::new()
},
settings: None,
postgresql_conf: Some(postgresql_conf),
},
@@ -577,11 +601,16 @@ impl Endpoint {
.open(self.endpoint_path().join("compute.log"))?;
// Launch compute_ctl
println!("Starting postgres node at '{}'", self.connstr());
let conn_str = self.connstr("cloud_admin", "postgres");
println!("Starting postgres node at '{}'", conn_str);
if create_test_user {
let conn_str = self.connstr("user", "neondb");
println!("Also at '{}'", conn_str);
}
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
cmd.args(["--http-port", &self.http_address.port().to_string()])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &self.connstr()])
.args(["--connstr", &conn_str])
.args([
"--spec-path",
self.endpoint_path().join("spec.json").to_str().unwrap(),
@@ -652,7 +681,9 @@ impl Endpoint {
}
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration => {
| ComputeStatus::Configuration
| ComputeStatus::TerminationPending
| ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status)
}
}
@@ -783,13 +814,13 @@ impl Endpoint {
Ok(())
}
pub fn connstr(&self) -> String {
pub fn connstr(&self, user: &str, db_name: &str) -> String {
format!(
"postgresql://{}@{}:{}/{}",
"cloud_admin",
user,
self.pg_address.ip(),
self.pg_address.port(),
"postgres"
db_name
)
}
}

View File

@@ -52,6 +52,10 @@ pub enum ComputeStatus {
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
// Termination requested
TerminationPending,
// Terminated Postgres
Terminated,
}
fn rfc3339_serialize<S>(x: &Option<DateTime<Utc>>, s: S) -> Result<S::Ok, S::Error>

View File

@@ -201,6 +201,11 @@ impl<P: Atomic> GenericCounterPairVec<P> {
pub fn with_label_values(&self, vals: &[&str]) -> GenericCounterPair<P> {
self.get_metric_with_label_values(vals).unwrap()
}
pub fn remove_label_values(&self, res: &mut [Result<()>; 2], vals: &[&str]) {
res[0] = self.inc.remove_label_values(vals);
res[1] = self.dec.remove_label_values(vals);
}
}
impl<P: Atomic> GenericCounterPair<P> {
@@ -247,6 +252,15 @@ impl<P: Atomic> GenericCounterPair<P> {
}
}
impl<P: Atomic> Clone for GenericCounterPair<P> {
fn clone(&self) -> Self {
Self {
inc: self.inc.clone(),
dec: self.dec.clone(),
}
}
}
/// Guard returned by [`GenericCounterPair::guard`]
pub struct GenericCounterPairGuard<P: Atomic>(GenericCounter<P>);

View File

@@ -21,6 +21,7 @@ hex.workspace = true
thiserror.workspace = true
humantime-serde.workspace = true
chrono.workspace = true
itertools.workspace = true
workspace_hack.workspace = true

View File

@@ -2,6 +2,7 @@ use postgres_ffi::BLCKSZ;
use std::ops::Range;
use crate::key::Key;
use itertools::Itertools;
///
/// Represents a set of Keys, in a compact form.
@@ -63,9 +64,36 @@ impl KeySpace {
KeyPartitioning { parts }
}
/// Update the keyspace such that it doesn't contain any range
/// that is overlapping with `other`. This can involve splitting or
/// removing of existing ranges.
/// Merge another keyspace into the current one.
/// Note: the keyspaces must not ovelap (enforced via assertions)
pub fn merge(&mut self, other: &KeySpace) {
let all_ranges = self
.ranges
.iter()
.merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
let mut accum = KeySpaceAccum::new();
let mut prev: Option<&Range<Key>> = None;
for range in all_ranges {
if let Some(prev) = prev {
let overlap =
std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
assert!(
!overlap,
"Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
prev, range
);
}
accum.add_range(range.clone());
prev = Some(range);
}
self.ranges = accum.to_keyspace().ranges;
}
/// Remove all keys in `other` from `self`.
/// This can involve splitting or removing of existing ranges.
pub fn remove_overlapping_with(&mut self, other: &KeySpace) {
let (self_start, self_end) = match (self.start(), self.end()) {
(Some(start), Some(end)) => (start, end),
@@ -220,16 +248,7 @@ impl KeySpaceAccum {
}
pub fn consume_keyspace(&mut self) -> KeySpace {
if let Some(accum) = self.accum.take() {
self.ranges.push(accum);
}
let mut prev_accum = KeySpaceAccum::new();
std::mem::swap(self, &mut prev_accum);
KeySpace {
ranges: prev_accum.ranges,
}
std::mem::take(self).to_keyspace()
}
pub fn size(&self) -> u64 {
@@ -279,6 +298,13 @@ impl KeySpaceRandomAccum {
}
KeySpace { ranges }
}
pub fn consume_keyspace(&mut self) -> KeySpace {
let mut prev_accum = KeySpaceRandomAccum::new();
std::mem::swap(self, &mut prev_accum);
prev_accum.to_keyspace()
}
}
pub fn key_range_size(key_range: &Range<Key>) -> u32 {

View File

@@ -180,7 +180,7 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,
#[serde(default)]
@@ -720,6 +720,7 @@ pub enum PagestreamFeMessage {
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
GetVectoredPages(PagestreamGetVectoredPagesRequest),
}
// Wrapped in libpq CopyData
@@ -731,6 +732,7 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
GetVectoredPages(PagestreamGetVectoredPagesResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -742,6 +744,7 @@ enum PagestreamBeMessageTag {
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
GetVectoredPages = 106,
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
@@ -753,6 +756,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
106 => Ok(PagestreamBeMessageTag::GetVectoredPages),
_ => Err(value),
}
}
@@ -795,6 +799,15 @@ pub struct PagestreamGetSlruSegmentRequest {
pub segno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetVectoredPagesRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
pub count: u8,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
@@ -815,6 +828,12 @@ pub struct PagestreamGetSlruSegmentResponse {
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetVectoredPagesResponse {
pub page_count: u8,
pub pages: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
@@ -886,6 +905,18 @@ impl PagestreamFeMessage {
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
Self::GetVectoredPages(req) => {
bytes.put_u8(5);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
bytes.put_u8(req.count);
}
}
bytes.into()
@@ -944,6 +975,20 @@ impl PagestreamFeMessage {
segno: body.read_u32::<BigEndian>()?,
},
)),
5 => Ok(PagestreamFeMessage::GetVectoredPages(
PagestreamGetVectoredPagesRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
count: body.read_u8()?,
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -985,6 +1030,12 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
Self::GetVectoredPages(resp) => {
bytes.put_u8(Tag::GetVectoredPages as u8);
bytes.put_u8(resp.page_count);
bytes.put(&resp.pages[..]);
}
}
bytes.into()
@@ -1033,6 +1084,15 @@ impl PagestreamBeMessage {
segment: segment.into(),
})
}
Tag::GetVectoredPages => {
let page_count = buf.read_u8()?;
let mut pages = vec![0; page_count as usize * 8192];
buf.read_exact(&mut pages)?;
Self::GetVectoredPages(PagestreamGetVectoredPagesResponse {
page_count,
pages: pages.into(),
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
@@ -1052,6 +1112,7 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
Self::GetVectoredPages(_) => "GetVectoredPages",
}
}
}

View File

@@ -4,7 +4,8 @@ use futures::SinkExt;
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse,
PagestreamGetPageResponse, PagestreamGetVectoredPagesRequest,
PagestreamGetVectoredPagesResponse,
},
reltag::RelTag,
};
@@ -157,7 +158,39 @@ impl PagestreamClient {
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_) => {
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetVectoredPages(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()
)
}
}
}
pub async fn getpages(
&mut self,
req: PagestreamGetVectoredPagesRequest,
) -> anyhow::Result<PagestreamGetVectoredPagesResponse> {
let req = PagestreamFeMessage::GetVectoredPages(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
let msg = PagestreamBeMessage::deserialize(next)?;
match msg {
PagestreamBeMessage::GetVectoredPages(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetPage(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()

View File

@@ -8,7 +8,7 @@ use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{debug, info, instrument};
use tracing::{info, instrument};
use std::collections::HashMap;
use std::num::NonZeroUsize;
@@ -28,6 +28,8 @@ pub(crate) struct Args {
#[clap(long, default_value = "localhost:64000")]
page_service_host_port: String,
#[clap(long)]
page_service_connstring: Option<String>,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
@@ -230,12 +232,17 @@ async fn client(
) {
start_work_barrier.wait().await;
let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
))
.await
.unwrap();
let connstr = match &args.page_service_connstring {
Some(connstr) => connstr.clone(),
None => crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
),
};
let client = pageserver_client::page_service::Client::new(connstr)
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
let start = Instant::now();
@@ -263,7 +270,7 @@ async fn client(
}
})
.await;
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {

View File

@@ -2,7 +2,7 @@ use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::PagestreamGetPageRequest;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamGetVectoredPagesRequest};
use tokio_util::sync::CancellationToken;
use utils::id::TenantTimelineId;
@@ -57,6 +57,8 @@ pub(crate) struct Args {
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
#[clap(long)]
vectored_read_size: Option<u8>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -299,22 +301,45 @@ async fn main_impl(
}
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
};
client.getpage(req).await.unwrap();
if let Some(size) = args.vectored_read_size {
assert!(size > 0);
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetVectoredPagesRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
count: size
}
};
client.getpages(req).await.unwrap();
} else {
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
};
client.getpage(req).await.unwrap();
}
let end = Instant::now();
live_stats.request_done();
ticks_processed += 1;

View File

@@ -143,6 +143,7 @@ where
ar: &'a mut Builder<&'b mut W>,
buf: Vec<u8>,
current_segment: Option<(SlruKind, u32)>,
total_blocks: usize,
}
impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
@@ -154,6 +155,7 @@ where
ar,
buf: Vec::new(),
current_segment: None,
total_blocks: 0,
}
}
@@ -199,7 +201,8 @@ where
let header = new_tar_header(&segname, self.buf.len() as u64)?;
self.ar.append(&header, self.buf.as_slice()).await?;
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
self.total_blocks += nblocks;
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
self.buf.clear();
@@ -207,11 +210,15 @@ where
}
async fn finish(mut self) -> anyhow::Result<()> {
if self.current_segment.is_none() || self.buf.is_empty() {
return Ok(());
}
let res = if self.current_segment.is_none() || self.buf.is_empty() {
Ok(())
} else {
self.flush().await
};
self.flush().await
info!("Collected {} SLRU blocks", self.total_blocks);
res
}
}
@@ -261,10 +268,7 @@ where
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
for part in slru_partitions.parts {
let blocks = self
.timeline
.get_vectored(&part.ranges, self.lsn, self.ctx)
.await?;
let blocks = self.timeline.get_vectored(part, self.lsn, self.ctx).await?;
for (key, block) in blocks {
slru_builder.add_block(&key, block?).await?;

View File

@@ -33,6 +33,7 @@ use utils::{
use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
@@ -84,6 +85,12 @@ pub mod defaults {
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
pub const DEFAULT_MAX_VECTORED_READ_SIZE: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
///
/// Default built-in configuration file.
///
@@ -121,6 +128,12 @@ pub mod defaults {
#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
#max_vectored_read_size = '{DEFAULT_MAX_VECTORED_READ_SIZE}'
#validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
[tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -256,6 +269,12 @@ pub struct PageServerConf {
pub ingest_batch_size: u64,
pub virtual_file_io_engine: virtual_file::IoEngineKind,
pub get_vectored_impl: GetVectoredImpl,
pub max_vectored_read_size: usize,
pub validate_vectored_get: bool,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -342,6 +361,12 @@ struct PageServerConfigBuilder {
ingest_batch_size: BuilderValue<u64>,
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
get_vectored_impl: BuilderValue<GetVectoredImpl>,
max_vectored_read_size: BuilderValue<usize>,
validate_vectored_get: BuilderValue<bool>,
}
impl Default for PageServerConfigBuilder {
@@ -419,6 +444,10 @@ impl Default for PageServerConfigBuilder {
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
max_vectored_read_size: Set(DEFAULT_MAX_VECTORED_READ_SIZE),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
}
}
}
@@ -579,6 +608,18 @@ impl PageServerConfigBuilder {
self.virtual_file_io_engine = BuilderValue::Set(value);
}
pub fn get_vectored_impl(&mut self, value: GetVectoredImpl) {
self.get_vectored_impl = BuilderValue::Set(value);
}
pub fn get_max_vectored_read_size(&mut self, value: usize) {
self.max_vectored_read_size = BuilderValue::Set(value);
}
pub fn get_validate_vectored_get(&mut self, value: bool) {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_warmup = self
.concurrent_tenant_warmup
@@ -689,6 +730,15 @@ impl PageServerConfigBuilder {
virtual_file_io_engine: self
.virtual_file_io_engine
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
get_vectored_impl: self
.get_vectored_impl
.ok_or(anyhow!("missing get_vectored_impl"))?,
max_vectored_read_size: self
.max_vectored_read_size
.ok_or(anyhow!("missing max_vectored_read_size"))?,
validate_vectored_get: self
.validate_vectored_get
.ok_or(anyhow!("missing validate_vectored_get"))?,
})
}
}
@@ -943,6 +993,15 @@ impl PageServerConf {
"virtual_file_io_engine" => {
builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
}
"get_vectored_impl" => {
builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
}
"max_vectored_read_size" => {
builder.get_max_vectored_read_size(parse_toml_u64("max_vectored_read_size", item)? as usize)
}
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1017,6 +1076,9 @@ impl PageServerConf {
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
}
}
}
@@ -1250,6 +1312,9 @@ background_task_maximum_delay = '334 s'
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
},
"Correct defaults should be used when no config values are provided"
);
@@ -1314,6 +1379,9 @@ background_task_maximum_delay = '334 s'
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: 100,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -4,8 +4,8 @@ use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec,
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPairVec,
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
@@ -1266,13 +1266,12 @@ pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
// remote storage metrics
/// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`].
static REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_remote_timeline_client_calls_unfinished",
"Number of ongoing calls to remote timeline client. \
Used to populate pageserver_remote_timeline_client_calls_started. \
This metric is not useful for sampling from Prometheus, but useful in tests.",
static REMOTE_TIMELINE_CLIENT_CALLS: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
"pageserver_remote_timeline_client_calls_started",
"Number of started calls to remote timeline client.",
"pageserver_remote_timeline_client_calls_finished",
"Number of finshed calls to remote timeline client.",
&[
"tenant_id",
"shard_id",
@@ -1281,23 +1280,7 @@ static REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE: Lazy<IntGaugeVec> = Lazy::
"op_kind"
],
)
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_remote_timeline_client_calls_started",
"When calling a remote timeline client method, we record the current value \
of the calls_unfinished gauge in this histogram. Plot the histogram \
over time in a heatmap to visualize how many operations were ongoing \
at a given instant. It gives you a better idea of the queue depth \
than plotting the gauge directly, since operations may complete faster \
than the sampling interval.",
&["file_kind", "op_kind"],
// The calls_unfinished gauge is an integer gauge, hence we have integer buckets.
vec![0.0, 1.0, 2.0, 4.0, 6.0, 8.0, 10.0, 15.0, 20.0, 40.0, 60.0, 80.0, 100.0, 500.0],
)
.expect("failed to define a metric")
.unwrap()
});
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> =
@@ -2078,7 +2061,7 @@ pub(crate) struct RemoteTimelineClientMetrics {
shard_id: String,
timeline_id: String,
remote_physical_size_gauge: Mutex<Option<PerTimelineRemotePhysicalSizeGauge>>,
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
calls: Mutex<HashMap<(&'static str, &'static str), IntCounterPair>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
}
@@ -2089,7 +2072,7 @@ impl RemoteTimelineClientMetrics {
tenant_id: tenant_shard_id.tenant_id.to_string(),
shard_id: format!("{}", tenant_shard_id.shard_slug()),
timeline_id: timeline_id.to_string(),
calls_unfinished_gauge: Mutex::new(HashMap::default()),
calls: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge: Mutex::new(None),
@@ -2129,15 +2112,15 @@ impl RemoteTimelineClientMetrics {
.unwrap()
}
fn calls_unfinished_gauge(
fn calls_counter_pair(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntGauge {
let mut guard = self.calls_unfinished_gauge.lock().unwrap();
) -> IntCounterPair {
let mut guard = self.calls.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE
REMOTE_TIMELINE_CLIENT_CALLS
.get_metric_with_label_values(&[
&self.tenant_id,
&self.shard_id,
@@ -2150,17 +2133,6 @@ impl RemoteTimelineClientMetrics {
metric.clone()
}
fn calls_started_hist(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Histogram {
let key = (file_kind.as_str(), op_kind.as_str());
REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST
.get_metric_with_label_values(&[key.0, key.1])
.unwrap()
}
fn bytes_started_counter(
&self,
file_kind: &RemoteOpFileKind,
@@ -2231,7 +2203,7 @@ impl RemoteTimelineClientMetrics {
#[must_use]
pub(crate) struct RemoteTimelineClientCallMetricGuard {
/// Decremented on drop.
calls_unfinished_metric: Option<IntGauge>,
calls_counter_pair: Option<IntCounterPair>,
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
bytes_finished: Option<(IntCounter, u64)>,
}
@@ -2241,10 +2213,10 @@ impl RemoteTimelineClientCallMetricGuard {
/// The caller vouches to do the metric updates manually.
pub fn will_decrement_manually(mut self) {
let RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric,
calls_counter_pair,
bytes_finished,
} = &mut self;
calls_unfinished_metric.take();
calls_counter_pair.take();
bytes_finished.take();
}
}
@@ -2252,10 +2224,10 @@ impl RemoteTimelineClientCallMetricGuard {
impl Drop for RemoteTimelineClientCallMetricGuard {
fn drop(&mut self) {
let RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric,
calls_counter_pair,
bytes_finished,
} = self;
if let Some(guard) = calls_unfinished_metric.take() {
if let Some(guard) = calls_counter_pair.take() {
guard.dec();
}
if let Some((bytes_finished_metric, value)) = bytes_finished {
@@ -2288,10 +2260,8 @@ impl RemoteTimelineClientMetrics {
op_kind: &RemoteOpKind,
size: RemoteTimelineClientMetricsCallTrackSize,
) -> RemoteTimelineClientCallMetricGuard {
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
self.calls_started_hist(file_kind, op_kind)
.observe(calls_unfinished_metric.get() as f64);
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
let calls_counter_pair = self.calls_counter_pair(file_kind, op_kind);
calls_counter_pair.inc();
let bytes_finished = match size {
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
@@ -2305,7 +2275,7 @@ impl RemoteTimelineClientMetrics {
}
};
RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric: Some(calls_unfinished_metric),
calls_counter_pair: Some(calls_counter_pair),
bytes_finished,
}
}
@@ -2319,12 +2289,8 @@ impl RemoteTimelineClientMetrics {
op_kind: &RemoteOpKind,
size: RemoteTimelineClientMetricsCallTrackSize,
) {
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
debug_assert!(
calls_unfinished_metric.get() > 0,
"begin and end should cancel out"
);
calls_unfinished_metric.dec();
let calls_counter_pair = self.calls_counter_pair(file_kind, op_kind);
calls_counter_pair.dec();
match size {
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
@@ -2341,18 +2307,15 @@ impl Drop for RemoteTimelineClientMetrics {
shard_id,
timeline_id,
remote_physical_size_gauge,
calls_unfinished_gauge,
calls,
bytes_started_counter,
bytes_finished_counter,
} = self;
for ((a, b), _) in calls_unfinished_gauge.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_CALLS_UNFINISHED_GAUGE.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
a,
b,
]);
for ((a, b), _) in calls.get_mut().unwrap().drain() {
let mut res = [Ok(()), Ok(())];
REMOTE_TIMELINE_CLIENT_CALLS
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id, a, b]);
// don't care about results
}
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[

View File

@@ -17,6 +17,8 @@ use futures::stream::FuturesUnordered;
use futures::Stream;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::models::PagestreamGetVectoredPagesRequest;
use pageserver_api::models::PagestreamGetVectoredPagesResponse;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -71,6 +73,7 @@ use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
@@ -334,6 +337,10 @@ enum PageStreamError {
#[error("Read error")]
Read(#[source] PageReconstructError),
/// Something went wrong reading a page: this likely indicates a pageserver bug
#[error("Vectored read error")]
VectoredRead(#[source] GetVectoredError),
/// Ran out of time waiting for an LSN
#[error("LSN timeout: {0}")]
LsnTimeout(WaitLsnError),
@@ -357,6 +364,15 @@ impl From<PageReconstructError> for PageStreamError {
}
}
impl From<GetVectoredError> for PageStreamError {
fn from(value: GetVectoredError) -> Self {
match value {
GetVectoredError::Cancelled => Self::Shutdown,
e => Self::VectoredRead(e),
}
}
}
impl From<GetActiveTimelineError> for PageStreamError {
fn from(value: GetActiveTimelineError) -> Self {
match value {
@@ -666,6 +682,15 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetVectoredPages(req) => {
let span = tracing::info_span!("handle_get_vectored_pages_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn, req_count = %req.count);
(
self.handle_get_pages_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
};
match response {
@@ -1161,6 +1186,80 @@ impl PageServerHandler {
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_pages_at_lsn_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetVectoredPagesRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
// This is cheeky and relies on not using sharding :)
// A real solution has to split the requested key sequence between shards.
let get_page_request = PagestreamGetPageRequest {
latest: req.latest,
lsn: req.lsn,
rel: req.rel,
blkno: req.blkno,
};
let timeline = match self.get_cached_timeline_for_page(&get_page_request) {
Ok(tl) => tl,
Err(key) => {
match self
.load_timeline_for_page(tenant_id, timeline_id, key)
.await
{
Ok(t) => t,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
}
}
};
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
set_tracing_field_shard_id(timeline);
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let (page_count, pages_buf) = timeline
.get_rel_pages_at_lsn(
req.rel,
req.blkno,
req.count,
Version::Lsn(lsn),
req.latest,
ctx,
)
.await?;
Ok(PagestreamBeMessage::GetVectoredPages(
PagestreamGetVectoredPagesResponse {
page_count,
pages: pages_buf,
},
))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_slru_segment_request(
&mut self,

View File

@@ -11,8 +11,9 @@ use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::repository::*;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::tenant::timeline::GetVectoredError;
use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context};
use anyhow::{anyhow, ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
@@ -27,7 +28,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
@@ -198,6 +199,41 @@ impl Timeline {
version.get(self, key, ctx).await
}
pub(crate) async fn get_rel_pages_at_lsn(
&self,
tag: RelTag,
blknum: BlockNumber,
count: u8,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<(u8, Bytes), GetVectoredError> {
if tag.relnode == 0 {
return Err(GetVectoredError::Other(
RelationError::InvalidRelnode.into(),
));
}
let nblocks = self
.get_rel_size(tag, version, latest, ctx)
.await
.map_err(|e| GetVectoredError::Other(anyhow!(e)))?;
if blknum + (count - 1) as u32 >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok((1, ZERO_PAGE.clone()));
}
let start_key = rel_block_to_key(tag, blknum);
let end_key = start_key.add(count as u32);
version.get_vectored(self, start_key..end_key, ctx).await
}
// Get size of a database in blocks
pub(crate) async fn get_db_size(
&self,
@@ -1609,6 +1645,55 @@ impl<'a> DatadirModification<'a> {
self.tline.get(key, lsn, ctx).await
}
async fn get_vectored(
&self,
key_range: Range<Key>,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let mut keys_in_modification = KeySpaceAccum::new();
let key = key_range.start;
while key != key_range.end {
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, value)) = values.last() {
keys_in_modification.add_key(key);
match value {
Value::Image(img) => {
results.insert(key, Ok(img.clone()));
}
_ => {
results.insert(
key,
Err(PageReconstructError::from(anyhow::anyhow!(
"unexpected pending WAL record"
))),
);
}
}
}
}
}
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
let mut keyspace = KeySpace {
ranges: vec![key_range],
};
keyspace.remove_overlapping_with(&keys_in_modification.to_keyspace());
let pages = self.tline.get_vectored(keyspace, lsn, ctx).await?;
results.extend(pages.into_iter());
Ok(results)
}
fn put(&mut self, key: Key, val: Value) {
let values = self.pending_updates.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
@@ -1652,6 +1737,43 @@ impl<'a> Version<'a> {
}
}
async fn get_vectored(
&self,
timeline: &Timeline,
key_range: Range<Key>,
ctx: &RequestContext,
) -> Result<(u8, Bytes), GetVectoredError> {
let pages = match self {
Version::Lsn(lsn) => {
timeline
.get_vectored(
KeySpace {
ranges: vec![key_range],
},
*lsn,
ctx,
)
.await
}
Version::Modified(modification) => modification.get_vectored(key_range, ctx).await,
}?;
let mut buf = BytesMut::new();
let page_count: u8 = pages.len().try_into().expect("too many pages returned");
for page in pages {
match page {
(_key, Ok(bytes)) => {
buf.extend_from_slice(&bytes[..]);
}
(_key, Err(err)) => {
return Err(GetVectoredError::Other(anyhow!(err)));
}
}
}
Ok((page_count, buf.freeze()))
}
fn get_lsn(&self) -> Lsn {
match self {
Version::Lsn(lsn) => *lsn,

View File

@@ -146,6 +146,7 @@ macro_rules! pausable_failpoint {
pub mod blob_io;
pub mod block_io;
pub mod vectored_blob_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
@@ -3877,6 +3878,7 @@ mod tests {
use bytes::BytesMut;
use hex_literal::hex;
use once_cell::sync::Lazy;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tokio_util::sync::CancellationToken;
@@ -4514,6 +4516,61 @@ mod tests {
Ok(())
}
async fn bulk_insert_compact_gc(
timeline: Arc<Timeline>,
ctx: &RequestContext,
mut lsn: Lsn,
repeat: usize,
key_count: usize,
) -> anyhow::Result<()> {
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
// Enforce that key range is monotonously increasing
let mut keyspace = KeySpaceAccum::new();
for _ in 0..repeat {
for _ in 0..key_count {
test_key.field6 = blknum;
let mut writer = timeline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
keyspace.add_key(test_key);
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
}
let cutoff = timeline.get_last_record_lsn();
timeline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
ctx,
)
.await?;
timeline.freeze_and_flush().await?;
timeline
.compact(&CancellationToken::new(), EnumSet::empty(), ctx)
.await?;
timeline.gc().await?;
}
Ok(())
}
//
// Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
// Repeat 50 times.
@@ -4526,49 +4583,98 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut lsn = Lsn(0x10);
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
let mut keyspace = KeySpaceAccum::new();
Ok(())
}
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
for _ in 0..50 {
for _ in 0..10000 {
test_key.field6 = blknum;
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
// Test the vectored get real implementation against a simple sequential implementation.
//
// The test generates a keyspace by repeatedly flushing the in-memory layer and compacting.
// Projected to 2D the key space looks like below. Lsn grows upwards on the Y axis and keys
// grow to the right on the X axis.
// [Delta]
// [Delta]
// [Delta]
// [Delta]
// ------------ Image ---------------
//
// After layer generation we pick the ranges to query as follows:
// 1. The beginning of each delta layer
// 2. At the seam between two adjacent delta layers
//
// There's one major downside to this test: delta layers only contains images,
// so the search can stop at the first delta layer and doesn't traverse any deeper.
#[tokio::test]
async fn test_get_vectored() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
keyspace.add_key(test_key);
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
let mut reads = Vec::new();
let mut prev = None;
guard.layer_map().iter_historic_layers().for_each(|desc| {
if !desc.is_delta() {
prev = Some(desc.clone());
return;
}
let cutoff = tline.get_last_record_lsn();
let start = desc.key_range.start;
let end = desc
.key_range
.start
.add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap());
reads.push(KeySpace {
ranges: vec![start..end],
});
if let Some(prev) = &prev {
if !prev.is_delta() {
return;
}
let first_range = Key {
field6: prev.key_range.end.field6 - 4,
..prev.key_range.end
}..prev.key_range.end;
let second_range = desc.key_range.start..Key {
field6: desc.key_range.start.field6 + 4,
..desc.key_range.start
};
reads.push(KeySpace {
ranges: vec![first_range, second_range],
});
};
prev = Some(desc.clone());
});
drop(guard);
// Pick a big LSN such that we query over all the changes.
// Technically, u64::MAX - 1 is the largest LSN supported by the read path,
// but there seems to be a bug on the non-vectored search path which surfaces
// in that case.
let reads_lsn = Lsn(u64::MAX - 1000);
for read in reads {
info!("Doing vectored read on {:?}", read);
let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await;
tline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
.await;
}
Ok(())

View File

@@ -52,8 +52,7 @@ use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use anyhow::Result;
use pageserver_api::keyspace::KeySpaceAccum;
use std::cmp::Ordering;
use std::collections::{BTreeMap, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
use std::sync::Arc;
@@ -147,43 +146,28 @@ impl Drop for BatchedUpdates<'_> {
}
/// Return value of LayerMap::search
#[derive(Eq, PartialEq, Debug)]
#[derive(Eq, PartialEq, Debug, Hash)]
pub struct SearchResult {
pub layer: Arc<PersistentLayerDesc>,
pub lsn_floor: Lsn,
}
pub struct OrderedSearchResult(SearchResult);
impl Ord for OrderedSearchResult {
fn cmp(&self, other: &Self) -> Ordering {
self.0.lsn_floor.cmp(&other.0.lsn_floor)
}
}
impl PartialOrd for OrderedSearchResult {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for OrderedSearchResult {
fn eq(&self, other: &Self) -> bool {
self.0.lsn_floor == other.0.lsn_floor
}
}
impl Eq for OrderedSearchResult {}
/// Return value of [`LayerMap::range_search`]
///
/// Contains a mapping from a layer description to a keyspace
/// accumulator that contains all the keys which intersect the layer
/// from the original search space. Keys that were not found are accumulated
/// in a separate key space accumulator.
#[derive(Debug)]
pub struct RangeSearchResult {
pub found: BTreeMap<OrderedSearchResult, KeySpaceAccum>,
pub found: HashMap<SearchResult, KeySpaceAccum>,
pub not_found: KeySpaceAccum,
}
impl RangeSearchResult {
fn new() -> Self {
Self {
found: BTreeMap::new(),
found: HashMap::new(),
not_found: KeySpaceAccum::new(),
}
}
@@ -314,7 +298,7 @@ where
Some(search_result) => self
.result
.found
.entry(OrderedSearchResult(search_result))
.entry(search_result)
.or_default()
.add_range(covered_range),
None => self.pad_range(covered_range),
@@ -362,6 +346,35 @@ where
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub enum InMemoryLayerHandle {
Open {
lsn_floor: Lsn,
end_lsn: Lsn,
},
Frozen {
idx: usize,
lsn_floor: Lsn,
end_lsn: Lsn,
},
}
impl InMemoryLayerHandle {
pub fn get_lsn_floor(&self) -> Lsn {
match self {
InMemoryLayerHandle::Open { lsn_floor, .. } => *lsn_floor,
InMemoryLayerHandle::Frozen { lsn_floor, .. } => *lsn_floor,
}
}
pub fn get_end_lsn(&self) -> Lsn {
match self {
InMemoryLayerHandle::Open { end_lsn, .. } => *end_lsn,
InMemoryLayerHandle::Frozen { end_lsn, .. } => *end_lsn,
}
}
}
impl LayerMap {
///
/// Find the latest layer (by lsn.end) that covers the given
@@ -556,6 +569,43 @@ impl LayerMap {
self.historic.iter()
}
/// Get a handle for the first in memory layer that matches the provided predicate.
/// The handle should be used with [`Self::get_in_memory_layer`] to retrieve the actual layer.
///
/// Note: [`Self::find_in_memory_layer`] and [`Self::get_in_memory_layer`] should be called during
/// the same exclusive region established by holding the layer manager lock.
pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<InMemoryLayerHandle>
where
Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
{
if let Some(open) = &self.open_layer {
if pred(open) {
return Some(InMemoryLayerHandle::Open {
lsn_floor: open.get_lsn_range().start,
end_lsn: open.get_lsn_range().end,
});
}
}
let pos = self.frozen_layers.iter().rev().position(pred);
pos.map(|rev_idx| {
let idx = self.frozen_layers.len() - 1 - rev_idx;
InMemoryLayerHandle::Frozen {
idx,
lsn_floor: self.frozen_layers[idx].get_lsn_range().start,
end_lsn: self.frozen_layers[idx].get_lsn_range().end,
}
})
}
/// Get the layer pointed to by the provided handle.
pub fn get_in_memory_layer(&self, handle: &InMemoryLayerHandle) -> Option<Arc<InMemoryLayer>> {
match handle {
InMemoryLayerHandle::Open { .. } => self.open_layer.clone(),
InMemoryLayerHandle::Frozen { idx, .. } => self.frozen_layers.get(*idx).cloned(),
}
}
///
/// Divide the whole given range of keys into sub-ranges based on the latest
/// image layer that covers each range at the specified lsn (inclusive).
@@ -869,6 +919,8 @@ impl LayerMap {
#[cfg(test)]
mod tests {
use pageserver_api::keyspace::KeySpace;
use super::*;
#[derive(Clone)]
@@ -895,15 +947,15 @@ mod tests {
fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
let lhs: Vec<_> = lhs
let lhs: HashMap<SearchResult, KeySpace> = lhs
.found
.into_iter()
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
.map(|(search_result, accum)| (search_result, accum.to_keyspace()))
.collect();
let rhs: Vec<_> = rhs
let rhs: HashMap<SearchResult, KeySpace> = rhs
.found
.into_iter()
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
.map(|(search_result, accum)| (search_result, accum.to_keyspace()))
.collect();
assert_eq!(lhs, rhs);
@@ -923,7 +975,7 @@ mod tests {
Some(res) => {
range_search_result
.found
.entry(OrderedSearchResult(res))
.entry(res)
.or_default()
.add_key(key);
}

View File

@@ -614,7 +614,7 @@ impl RemoteTimelineClient {
metadata,
);
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
self.calls_unfinished_metric_begin(&op);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
@@ -654,7 +654,7 @@ impl RemoteTimelineClient {
metadata.generation, metadata.shard
);
let op = UploadOp::UploadLayer(layer, metadata);
self.calls_unfinished_metric_begin(&op);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
}
@@ -823,10 +823,14 @@ impl RemoteTimelineClient {
}
// schedule the actual deletions
if with_metadata.is_empty() {
// avoid scheduling the op & bumping the metric
return;
}
let op = UploadOp::Delete(Delete {
layers: with_metadata,
});
self.calls_unfinished_metric_begin(&op);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
}
@@ -1516,10 +1520,10 @@ impl RemoteTimelineClient {
.await;
}
self.calls_unfinished_metric_end(&task.op);
self.metric_end(&task.op);
}
fn calls_unfinished_metric_impl(
fn metric_impl(
&self,
op: &UploadOp,
) -> Option<(
@@ -1556,17 +1560,17 @@ impl RemoteTimelineClient {
Some(res)
}
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
fn metric_begin(&self, op: &UploadOp) {
let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
Some(x) => x,
None => return,
};
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
guard.will_decrement_manually(); // in metric_end(), see right below
}
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
fn metric_end(&self, op: &UploadOp) {
let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
Some(x) => x,
None => return,
};
@@ -1651,7 +1655,7 @@ impl RemoteTimelineClient {
// Tear down queued ops
for op in qi.queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
self.metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);

View File

@@ -81,15 +81,7 @@ pub async fn download_layer_file<'a>(
.with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
.map_err(DownloadError::Other)?;
let download = storage
.download(&remote_path, cancel)
.await
.with_context(|| {
format!(
"open a download stream for layer with remote storage path '{remote_path:?}'"
)
})
.map_err(DownloadError::Other)?;
let download = storage.download(&remote_path, cancel).await?;
let mut destination_file =
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
@@ -98,9 +90,11 @@ pub async fn download_layer_file<'a>(
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file)
.await
.with_context(|| format!(
.with_context(|| {
format!(
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
))
)
})
.map_err(DownloadError::Other);
match bytes_amount {

View File

@@ -8,15 +8,21 @@ pub(crate) mod layer;
mod layer_desc;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::cmp::{Ordering, Reverse};
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::ops::Range;
use std::sync::Mutex;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -34,6 +40,11 @@ pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
use super::layer_map::InMemoryLayerHandle;
use super::timeline::layer_manager::LayerManager;
use super::timeline::GetVectoredError;
use super::PageReconstructError;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
@@ -67,6 +78,287 @@ pub struct ValueReconstructState {
pub img: Option<(Lsn, Bytes)>,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(crate) enum ValueReconstructSituation {
Complete,
#[default]
Continue,
}
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default, Clone)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
situation: ValueReconstructSituation,
}
impl VectoredValueReconstructState {
fn get_cached_lsn(&self) -> Option<Lsn> {
self.img.as_ref().map(|img| img.0)
}
}
impl From<VectoredValueReconstructState> for ValueReconstructState {
fn from(mut state: VectoredValueReconstructState) -> Self {
// walredo expects the records to be descending in terms of Lsn
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
ValueReconstructState {
records: state.records,
img: state.img,
}
}
}
/// Bag of data accumulated during a vectored get
pub(crate) struct ValuesReconstructState {
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
keys_done: KeySpaceRandomAccum,
}
impl ValuesReconstructState {
pub(crate) fn new() -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
}
}
/// Associate a key with the error which it encountered and mark it as done
pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
let previous = self.keys.insert(key, Err(err));
if let Some(Ok(state)) = previous {
if state.situation == ValueReconstructSituation::Continue {
self.keys_done.add_key(key);
}
}
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///
/// If the key is done after the update, mark it as such.
pub(crate) fn update_key(
&mut self,
key: &Key,
lsn: Lsn,
value: Value,
) -> ValueReconstructSituation {
let state = self
.keys
.entry(*key)
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
let reached_cache =
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init || reached_cache
}
},
};
if key_done && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}
state.situation
} else {
ValueReconstructSituation::Complete
}
}
/// Returns the Lsn at which this key is cached if one exists.
/// The read path should go no further than this Lsn for the given key.
pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
self.keys
.get(key)
.and_then(|k| k.as_ref().ok())
.and_then(|state| state.get_cached_lsn())
}
/// Returns the key space describing the keys that have
/// been marked as completed since the last call to this function.
pub(crate) fn consume_done_keys(&mut self) -> KeySpace {
self.keys_done.consume_keyspace()
}
}
impl Default for ValuesReconstructState {
fn default() -> Self {
Self::new()
}
}
/// Description of layer to be read - the layer map can turn
/// this description into the actual layer.
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub(crate) enum ReadableLayerDesc {
Persistent {
desc: PersistentLayerDesc,
lsn_floor: Lsn,
lsn_ceil: Lsn,
},
InMemory {
handle: InMemoryLayerHandle,
lsn_ceil: Lsn,
},
}
/// Wraper for 'ReadableLayerDesc' sorted by Lsn
#[derive(Debug)]
struct ReadableLayerDescOrdered(ReadableLayerDesc);
/// Data structure which maintains a fringe of layers for the
/// read path. The fringe is the set of layers which intersects
/// the current keyspace that the search is descending on.
/// Each layer tracks the keyspace that intersects it.
///
/// The fringe must appear sorted by Lsn. Hence, it uses
/// a two layer indexing scheme.
#[derive(Debug)]
pub(crate) struct LayerFringe {
layers_by_lsn: BinaryHeap<ReadableLayerDescOrdered>,
layers: HashMap<ReadableLayerDesc, KeySpace>,
}
impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
layers_by_lsn: BinaryHeap::new(),
layers: HashMap::new(),
}
}
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayerDesc, KeySpace)> {
let handle = match self.layers_by_lsn.pop() {
Some(h) => h,
None => return None,
};
let removed = self.layers.remove_entry(&handle.0);
match removed {
Some((layer, keyspace)) => Some((layer, keyspace)),
None => unreachable!("fringe internals are always consistent"),
}
}
pub(crate) fn update(&mut self, layer: ReadableLayerDesc, keyspace: KeySpace) {
let entry = self.layers.entry(layer.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().merge(&keyspace);
}
Entry::Vacant(entry) => {
self.layers_by_lsn
.push(ReadableLayerDescOrdered(entry.key().clone()));
entry.insert(keyspace);
}
}
}
}
impl Default for LayerFringe {
fn default() -> Self {
Self::new()
}
}
impl Ord for ReadableLayerDescOrdered {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.0.get_lsn_ceil().cmp(&other.0.get_lsn_ceil());
if ord == std::cmp::Ordering::Equal {
self.0
.get_lsn_floor()
.cmp(&other.0.get_lsn_floor())
.reverse()
} else {
ord
}
}
}
impl PartialOrd for ReadableLayerDescOrdered {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ReadableLayerDescOrdered {
fn eq(&self, other: &Self) -> bool {
self.0.get_lsn_floor() == other.0.get_lsn_floor()
&& self.0.get_lsn_ceil() == other.0.get_lsn_ceil()
}
}
impl Eq for ReadableLayerDescOrdered {}
impl ReadableLayerDesc {
pub(crate) fn get_lsn_floor(&self) -> Lsn {
match self {
ReadableLayerDesc::Persistent { lsn_floor, .. } => *lsn_floor,
ReadableLayerDesc::InMemory { handle, .. } => handle.get_lsn_floor(),
}
}
pub(crate) fn get_lsn_ceil(&self) -> Lsn {
match self {
ReadableLayerDesc::Persistent { lsn_ceil, .. } => *lsn_ceil,
ReadableLayerDesc::InMemory { lsn_ceil, .. } => *lsn_ceil,
}
}
pub(crate) async fn get_values_reconstruct_data(
&self,
layer_manager: &LayerManager,
keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
match self {
ReadableLayerDesc::Persistent {
desc,
lsn_floor,
lsn_ceil,
} => {
let layer = layer_manager.get_from_desc(desc);
layer
.get_values_reconstruct_data(
keyspace,
*lsn_floor,
*lsn_ceil,
reconstruct_state,
ctx,
)
.await
}
ReadableLayerDesc::InMemory { handle, lsn_ceil } => {
let layer = layer_manager
.layer_map()
.get_in_memory_layer(handle)
.unwrap();
layer
.get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx)
.await
}
}
}
}
/// Return value from [`Layer::get_value_reconstruct_data`]
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {

View File

@@ -35,12 +35,18 @@ use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
@@ -59,7 +65,9 @@ use utils::{
lsn::Lsn,
};
use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
use super::{
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
};
///
/// Header stored in the beginning of the file
@@ -207,6 +215,7 @@ pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
vectored_blob_reader: VectoredBlobReader,
/// Reader object for reading blocks from the file.
file: FileBlockReader,
@@ -242,7 +251,7 @@ impl DeltaLayer {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
inner.dump(ctx).await
}
@@ -278,20 +287,25 @@ impl DeltaLayer {
async fn load(
&self,
access_kind: LayerAccessKind,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<&Arc<DeltaLayerInner>> {
self.access_stats.record_access(access_kind, ctx);
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
.await
.with_context(|| format!("Failed to load delta layer {}", self.path()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
async fn load_inner(
&self,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, ctx)
let loaded = DeltaLayerInner::load(&path, None, max_vectored_read_size, ctx)
.await
.and_then(|res| res)?;
@@ -692,15 +706,16 @@ impl DeltaLayerInner {
pub(super) async fn load(
path: &Utf8Path,
summary: Option<Summary>,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file = FileBlockReader::new(file);
let block_reader = FileBlockReader::new(file);
let summary_blk = match file.read_blk(0, ctx).await {
let summary_blk = match block_reader.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
@@ -722,8 +737,16 @@ impl DeltaLayerInner {
}
}
// TODO: don't open file twice
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
Ok(Ok(DeltaLayerInner {
file,
file: block_reader,
vectored_blob_reader,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
}))
@@ -818,6 +841,174 @@ impl DeltaLayerInner {
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//
// If the key is cached, go no further than the cached Lsn.
//
// Currently, the index is visited for each range, but this
// can be further optimised to visit the index only once.
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, start_lsn..end_lsn, reconstruct_state, ctx)
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
Ok(())
}
async fn plan_reads(
&self,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(self.vectored_blob_reader.get_max_read_size());
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
tree_reader
.visit(
&start_key.0,
VisitDirection::Forwards,
|raw_key, value| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
let blob_ref = BlobRef(value);
assert!(key >= range.start && lsn >= lsn_range.start);
let cached_lsn = reconstruct_state.get_cached_lsn(&key);
let flag = {
if cached_lsn >= Some(lsn) {
BlobFlag::Ignore
} else if blob_ref.will_init() {
BlobFlag::Replaces
} else {
BlobFlag::None
}
};
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
planner.handle_range_end(blob_ref.pos());
range_end_handled = true;
false
} else {
planner.handle(key, lsn, blob_ref.pos(), flag);
true
}
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
.await
.map_err(|err| anyhow!(err))?;
if !range_end_handled {
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
tracing::info!("Handling range end fallback at {}", payload_end);
planner.handle_range_end(payload_end);
}
}
Ok(planner.finish())
}
async fn do_reads_and_update_state(
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
) {
let mut ignore_key_with_err = None;
let mut buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
for read in reads.into_iter().rev() {
let res = self
.vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.await;
let blobs_buf = match res {
Ok(blobs_buf) => blobs_buf,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.vectored_blob_reader.get_file_ref().path,
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
continue;
}
};
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.vectored_blob_reader.get_file_ref().path,
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
// state, no further updates shall be made to it. The call below will
// panic if the invariant is violated.
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
}
buf = Some(blobs_buf.buf);
}
}
pub(super) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,

View File

@@ -26,20 +26,25 @@
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::Timeline;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
@@ -59,7 +64,7 @@ use utils::{
};
use super::filename::ImageFileName;
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState};
///
/// Header stored in the beginning of the file
@@ -152,6 +157,7 @@ pub struct ImageLayerInner {
/// Reader object for reading blocks from the file.
file: FileBlockReader,
vectored_blob_reader: VectoredBlobReader,
}
impl std::fmt::Debug for ImageLayerInner {
@@ -208,7 +214,7 @@ impl ImageLayer {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
inner.dump(ctx).await?;
@@ -238,21 +244,32 @@ impl ImageLayer {
async fn load(
&self,
access_kind: LayerAccessKind,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<&ImageLayerInner> {
self.access_stats.record_access(access_kind, ctx);
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
.await
.with_context(|| format!("Failed to load image layer {}", self.path()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
async fn load_inner(
&self,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
.await
.and_then(|res| res)?;
let loaded = ImageLayerInner::load(
&path,
self.desc.image_layer_lsn(),
None,
max_vectored_read_size,
ctx,
)
.await
.and_then(|res| res)?;
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
@@ -359,14 +376,15 @@ impl ImageLayerInner {
path: &Utf8Path,
lsn: Lsn,
summary: Option<Summary>,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file = FileBlockReader::new(file);
let summary_blk = match file.read_blk(0, ctx).await {
let block_reader = FileBlockReader::new(file);
let summary_blk = match block_reader.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
@@ -392,11 +410,19 @@ impl ImageLayerInner {
}
}
// TODO: don't open file twice
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn,
file,
file: block_reader,
vectored_blob_reader,
}))
}
@@ -438,6 +464,124 @@ impl ImageLayerInner {
Ok(ValueReconstructResult::Missing)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, ctx)
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
Ok(())
}
async fn plan_reads(
&self,
keyspace: KeySpace,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(self.vectored_blob_reader.get_max_read_size());
let file = &self.file;
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut search_key);
tree_reader
.visit(
&search_key,
VisitDirection::Forwards,
|raw_key, offset| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
assert!(key >= range.start);
if key >= range.end {
planner.handle_range_end(offset);
range_end_handled = true;
false
} else {
planner.handle(key, self.lsn, offset, BlobFlag::None);
true
}
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(),
)
.await
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
if !range_end_handled {
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
planner.handle_range_end(payload_end);
}
}
Ok(planner.finish())
}
async fn do_reads_and_update_state(
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
) {
let mut buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
for read in reads.into_iter().rev() {
let res = self
.vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.await;
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter().rev() {
let img_buf = Bytes::copy_from_slice(&blobs_buf.buf[meta.start..meta.end]);
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
}
buf = Some(blobs_buf.buf);
}
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.vectored_blob_reader.get_file_ref().path,
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
}
};
}
}
}
/// A builder object for constructing a new image layer.

View File

@@ -9,13 +9,15 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::repository::{Key, Value};
use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::tenant::storage_layer::ValueReconstructResult;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::walrecord;
use anyhow::{ensure, Result};
use anyhow::{anyhow, ensure, Result};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::{Arc, OnceLock};
use tracing::*;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
@@ -25,7 +27,10 @@ use std::fmt::Write as _;
use std::ops::Range;
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{DeltaLayerWriter, ResidentLayer};
use super::{
DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
ValuesReconstructState,
};
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -202,6 +207,91 @@ impl InMemoryLayer {
Ok(ValueReconstructResult::Complete)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//
// If the key is cached, go no further than the cached Lsn.
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let inner = self.inner.read().await;
let reader = inner.file.block_cursor();
#[derive(Eq, PartialEq, Ord, PartialOrd)]
struct BlockRead {
key: Key,
lsn: Lsn,
block_offset: u64,
}
let mut planned_block_reads = BinaryHeap::new();
for range in keyspace.ranges.iter() {
let mut key = range.start;
while key < range.end {
if let Some(vec_map) = inner.index.get(&key) {
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
None => self.start_lsn..end_lsn,
};
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
planned_block_reads.push(BlockRead {
key,
lsn: *entry_lsn,
block_offset: *pos,
});
}
}
key = key.next();
}
}
let keyspace_size = keyspace.total_size();
let mut completed_keys = HashSet::new();
while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
let block_read = planned_block_reads.pop().unwrap();
if completed_keys.contains(&block_read.key) {
continue;
}
let buf = reader.read_blob(block_read.block_offset, &ctx).await;
if let Err(e) = buf {
reconstruct_state
.on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
completed_keys.insert(block_read.key);
continue;
}
let value = Value::des(&buf.unwrap());
if let Err(e) = value {
reconstruct_state
.on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
completed_keys.insert(block_read.key);
continue;
}
let key_situation =
reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
completed_keys.insert(block_read.key);
}
}
Ok(())
}
}
impl std::fmt::Display for InMemoryLayer {

View File

@@ -1,5 +1,6 @@
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
@@ -16,13 +17,14 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
use super::delta_layer::{self, DeltaEntry};
use super::image_layer;
use super::{
AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerFileName, PersistentLayerDesc,
ValueReconstructResult, ValueReconstructState,
ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
};
use utils::generation::Generation;
@@ -262,6 +264,37 @@ impl Layer {
.with_context(|| format!("get_value_reconstruct_data for layer {self}"))
}
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let layer = self
.0
.get_or_maybe_download(true, Some(ctx))
.await
.map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?;
self.0
.access_stats
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
layer
.get_values_reconstruct_data(
keyspace,
start_lsn,
end_lsn,
reconstruct_data,
&self.0,
ctx,
)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.await
}
/// Download the layer if evicted.
///
/// Will not error when the layer is already downloaded.
@@ -1177,7 +1210,7 @@ pub(crate) enum EvictionError {
/// Error internal to the [`LayerInner::get_or_maybe_download`]
#[derive(Debug, thiserror::Error)]
enum DownloadError {
pub(crate) enum DownloadError {
#[error("timeline has already shutdown")]
TimelineShutdown,
#[error("no remote storage configured")]
@@ -1274,9 +1307,14 @@ impl DownloadedLayer {
owner.desc.key_range.clone(),
owner.desc.lsn_range.clone(),
));
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
.await
.map(|res| res.map(LayerKind::Delta))
delta_layer::DeltaLayerInner::load(
&owner.path,
summary,
owner.conf.max_vectored_read_size,
ctx,
)
.await
.map(|res| res.map(LayerKind::Delta))
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
@@ -1285,9 +1323,15 @@ impl DownloadedLayer {
owner.desc.key_range.clone(),
lsn,
));
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
.await
.map(|res| res.map(LayerKind::Image))
image_layer::ImageLayerInner::load(
&owner.path,
lsn,
summary,
owner.conf.max_vectored_read_size,
ctx,
)
.await
.map(|res| res.map(LayerKind::Image))
};
match res {
@@ -1337,6 +1381,29 @@ impl DownloadedLayer {
}
}
async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_data: &mut ValuesReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
use LayerKind::*;
match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
Delta(d) => {
d.get_values_reconstruct_data(keyspace, start_lsn, end_lsn, reconstruct_data, ctx)
.await
}
Image(i) => {
i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
.await
}
}
}
async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
use LayerKind::*;
match self.get(owner, ctx).await? {

View File

@@ -15,7 +15,7 @@ use utils::id::TenantId;
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
/// a unified way to generate layer information like file name.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
pub struct PersistentLayerDesc {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,

View File

@@ -16,7 +16,7 @@ use futures::stream::StreamExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
use pageserver_api::{
keyspace::{key_range_size, KeySpaceAccum},
keyspace::KeySpaceAccum,
models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
LayerMapInfo, TimelineState,
@@ -67,7 +67,7 @@ use crate::{
tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState,
ValueReconstructState, ValuesReconstructState,
},
};
use crate::{
@@ -111,11 +111,11 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -472,6 +472,15 @@ pub(crate) enum GetVectoredError {
#[error("Requested at invalid LSN: {0}")]
InvalidLsn(Lsn),
#[error("Requested key {0} not found")]
MissingKey(Key),
#[error(transparent)]
GetReadyAncestorError(GetReadyAncestorError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
@@ -579,6 +588,23 @@ impl From<GetReadyAncestorError> for PageReconstructError {
}
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetVectoredImpl {
Sequential,
Vectored,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -708,7 +734,7 @@ impl Timeline {
/// which actually vectorizes the read path.
pub(crate) async fn get_vectored(
&self,
key_ranges: &[Range<Key>],
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
@@ -716,10 +742,7 @@ impl Timeline {
return Err(GetVectoredError::InvalidLsn(lsn));
}
let key_count = key_ranges
.iter()
.map(|range| key_range_size(range) as u64)
.sum();
let key_count = keyspace.total_size().try_into().unwrap();
if key_count > Timeline::MAX_GET_VECTORED_KEYS {
return Err(GetVectoredError::Oversized(key_count));
}
@@ -728,33 +751,165 @@ impl Timeline {
.throttle(ctx, key_count as usize)
.await;
let _timer = crate::metrics::GET_VECTORED_LATENCY
.for_task_kind(ctx.task_kind())
.map(|t| t.start_timer());
let mut values = BTreeMap::new();
for range in key_ranges {
for range in &keyspace.ranges {
let mut key = range.start;
while key != range.end {
assert!(!self.shard_identity.is_key_disposable(&key));
let block = self.get(key, lsn, ctx).await;
if matches!(
block,
Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
) {
return Err(GetVectoredError::Cancelled);
}
values.insert(key, block);
key = key.next();
}
}
trace!(
"get vectored request for {:?}@{} from task kind {:?} will use {} implementation",
keyspace,
lsn,
ctx.task_kind(),
self.conf.get_vectored_impl
);
let _timer = crate::metrics::GET_VECTORED_LATENCY
.for_task_kind(ctx.task_kind())
.map(|t| t.start_timer());
match self.conf.get_vectored_impl {
GetVectoredImpl::Sequential => {
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
}
GetVectoredImpl::Vectored => {
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
vectored_res
}
}
}
pub(super) async fn get_vectored_sequential_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new();
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
let block = self.get(key, lsn, ctx).await;
use PageReconstructError::*;
match block {
Err(Cancelled | AncestorStopping(_)) => {
return Err(GetVectoredError::Cancelled)
}
Err(Other(err)) if err.to_string().contains("could not find data for key") => {
return Err(GetVectoredError::MissingKey(key))
}
_ => {
values.insert(key, block);
key = key.next();
}
}
}
}
Ok(values)
}
pub(super) async fn get_vectored_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
.await?;
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
for (key, res) in reconstruct_state.keys {
match res {
Err(err) => {
results.insert(key, Err(err));
}
Ok(state) => {
let state = ValueReconstructState::from(state);
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
results.insert(key, reconstruct_res);
}
}
}
Ok(results)
}
pub(super) async fn validate_get_vectored_impl(
&self,
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) {
let sequential_res = self
.get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
.await;
fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
use GetVectoredError::*;
match (lhs, rhs) {
(Cancelled, Cancelled) => true,
(_, Cancelled) => true,
(Oversized(l), Oversized(r)) => l == r,
(InvalidLsn(l), InvalidLsn(r)) => l == r,
(MissingKey(l), MissingKey(r)) => l == r,
(GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
(Other(_), Other(_)) => true,
_ => false,
}
}
match (&sequential_res, vectored_res) {
(Err(seq_err), Ok(_)) => {
panic!(concat!("Sequential get failed with {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
seq_err, keyspace, lsn) },
(Ok(_), Err(vec_err)) => {
panic!(concat!("Vectored get failed with {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
vec_err, keyspace, lsn) },
(Err(seq_err), Err(vec_err)) => {
assert!(errors_match(seq_err, vec_err),
"Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
(Ok(seq_values), Ok(vec_values)) => {
seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
assert_eq!(seq_key, vec_key);
match (seq_res, vec_res) {
(Ok(seq_blob), Ok(vec_blob)) => {
assert_eq!(seq_blob, vec_blob,
"Image mismatch for key {seq_key} - keyspace={keyspace:?} lsn={lsn}");
},
(Err(err), Ok(_)) => {
panic!(
concat!("Sequential get failed with {} for key {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
err, seq_key, keyspace, lsn) },
(Ok(_), Err(err)) => {
panic!(
concat!("Vectored get failed with {} for key {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
err, seq_key, keyspace, lsn) },
(Err(_), Err(_)) => {}
}
})
}
}
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
@@ -2547,6 +2702,170 @@ impl Timeline {
}
}
/// Get the data needed to reconstruct all keys in the provided keyspace
///
/// The algorithm is as follows:
/// 1. While some keys are still not done and there's a timeline to visit:
/// 2. Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_timeline`]:
/// 2.1: Build the fringe for the current keyspace
/// 2.2 Visit the newest layer from the fringe to collect all values for the range it
/// intersects
/// 2.3. Pop the timeline from the fringe
/// 2.4. If the fringe is empty, go back to 1
async fn get_vectored_reconstruct_data(
&self,
mut keyspace: KeySpace,
request_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let mut timeline_owned: Arc<Timeline>;
let mut timeline = self;
let mut cont_lsn = Lsn(request_lsn.0 + 1);
loop {
if self.cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
let completed = Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
reconstruct_state,
&self.cancel,
ctx,
)
.await?;
keyspace.remove_overlapping_with(&completed);
if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
break;
}
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
timeline_owned = timeline
.get_ready_ancestor_timeline(ctx)
.await
.map_err(GetVectoredError::GetReadyAncestorError)?;
timeline = &*timeline_owned;
}
if keyspace.total_size() != 0 {
return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
}
Ok(())
}
/// Collect the reconstruct data for a ketspace from the specified timeline.
///
/// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
/// the current keyspace. The current keyspace of the search at any given timeline
/// is the original keyspace minus all the keys that have been completed minus
/// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly,
/// but if you merge all the keyspaces in the fringe, you get the "current keyspace".
///
/// This is basically a depth-first search visitor implementation where a vertex
/// is the (layer, lsn range, key space) tuple. The fringe acts as the stack.
///
/// At each iteration pop the top of the fringe (the layer with the highest Lsn)
/// and get all the required reconstruct data from the layer in one go.
async fn get_vectored_reconstruct_data_timeline(
timeline: &Timeline,
keyspace: KeySpace,
mut cont_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<KeySpace, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone();
let mut fringe = LayerFringe::new();
let mut completed_keyspace = KeySpace::default();
// Hold the layer map whilst visiting the timeline to prevent
// compaction, eviction and flushes from rendering the layers unreadable.
//
// TODO: Do we actually need to do this? In theory holding on
// to [`tenant::storage_layer::Layer`] should be enough. However,
// [`Timeline::get`] also holds the lock during IO, so more investigation
// is needed.
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
'outer: loop {
if cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
let keys_done_last_step = reconstruct_state.consume_done_keys();
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
completed_keyspace.merge(&keys_done_last_step);
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
});
match in_memory_layer {
Some(l) => {
fringe.update(
ReadableLayerDesc::InMemory {
handle: l,
lsn_ceil: cont_lsn,
},
unmapped_keyspace.clone(),
);
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let results = match layers.range_search(range.clone(), cont_lsn) {
Some(res) => res,
None => {
break 'outer;
}
};
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
ReadableLayerDesc::Persistent {
desc: (*layer).clone(),
lsn_floor,
lsn_ceil: cont_lsn,
},
keyspace_accum.to_keyspace(),
)
})
.for_each(|(layer, keyspace)| fringe.update(layer, keyspace));
}
}
}
if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() {
layer_to_read
.get_values_reconstruct_data(
&guard,
keyspace_to_read.clone(),
reconstruct_state,
ctx,
)
.await?;
unmapped_keyspace = keyspace_to_read;
cont_lsn = layer_to_read.get_lsn_floor();
} else {
break;
}
}
Ok(completed_keyspace)
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
@@ -3263,7 +3582,7 @@ impl Timeline {
|| last_key_in_range
{
let results = self
.get_vectored(&key_request_accum.consume_keyspace().ranges, lsn, ctx)
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.await?;
for (img_key, img) in results {
@@ -4875,11 +5194,15 @@ impl<'a> TimelineWriter<'a> {
// Rolling the open layer can be triggered by:
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
// the safekeepers need to store.
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
// account for how writes are distributed across shards: we expect each node to consume
// 1/count of the LSN on average.
// 2. The size of the currently open layer.
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
// up and suspend activity.
if distance >= self.get_checkpoint_distance().into() {
if distance
>= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128
{
info!(
"Will roll layer at {} with layer size {} due to LSN distance ({})",
lsn, state.current_size, distance

View File

@@ -0,0 +1,412 @@
//!
//! Utilities for vectored reading of variable-sized "blobs".
//!
//! The "blob" api is an abstraction on top of the "block" api,
//! with the main difference being that blobs do not have a fixed
//! size (each blob is prefixed with 1 or 4 byte length field)
//!
//! The vectored apis provided in this module allow for planning
//! and executing disk IO which covers multiple blobs.
//!
//! Reads are planned with [`VectoredReadPlanner`] which will coalesce
//! adjacent blocks into a single disk IO request and exectuted by
//! [`VectoredBlobReader`] which does all the required offset juggling
//! and returns a buffer housing all the blobs and a list of offsets.
//!
//! Note that the vectored blob api does *not* go through the page cache.
use std::collections::BTreeMap;
use bytes::BytesMut;
use pageserver_api::key::Key;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use crate::virtual_file::VirtualFile;
/// Metadata bundled with the start and end offset of a blob.
#[derive(Copy, Clone, Debug)]
pub struct BlobMeta {
pub key: Key,
pub lsn: Lsn,
}
/// Blob offsets into [`VectoredBlobsBuf::buf`]
pub struct VectoredBlob {
pub start: usize,
pub end: usize,
pub meta: BlobMeta,
}
/// Return type of [`VectoredBlobReader::read_blobs`]
pub struct VectoredBlobsBuf {
/// Buffer for all blobs in this read
pub buf: BytesMut,
/// Offsets into the buffer and metadata for all blobs in this read
pub blobs: Vec<VectoredBlob>,
}
/// Description of one disk read for multiple blobs.
/// Used as the argument form [`VectoredBlobReader::read_blobs`]
#[derive(Debug)]
pub struct VectoredRead {
pub start: u64,
pub end: u64,
/// Starting offsets and metadata for each blob in this read
pub blobs_at: VecMap<u64, BlobMeta>,
max_read_size: usize,
}
#[derive(Eq, PartialEq)]
enum VectoredReadExtended {
Yes,
No,
}
impl VectoredRead {
fn new(start_offset: u64, end_offset: u64, meta: BlobMeta, max_read_size: usize) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
.expect("First insertion always succeeds");
Self {
start: start_offset,
end: end_offset,
blobs_at,
max_read_size,
}
}
/// Attempt to extend the current read with a new blob if the start
/// offset matches with the current end of the vectored read
/// and the resuting size is below the max read size
fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
let size = (end - start) as usize;
if self.end == start && self.size() + size <= self.max_read_size {
self.end = end;
self.blobs_at
.append(start, meta)
.expect("LSNs are ordered within vectored reads");
return VectoredReadExtended::Yes;
}
VectoredReadExtended::No
}
fn size(&self) -> usize {
(self.end - self.start) as usize
}
}
#[derive(Copy, Clone, Debug)]
pub enum BlobFlag {
None,
Ignore,
Replaces,
}
/// Planner for vectored blob reads.
///
/// Blob offsets are received via [`VectoredReadPlanner::handle`]
/// and coalesced into disk reads.
///
/// The implementation is very simple:
/// * Collect all blob offsets in an ordered structure
/// * Iterate over the collected blobs and coalesce them into reads at the end
pub struct VectoredReadPlanner {
// Track all the blob offsets. Start offsets must be ordered.
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64, BlobFlag)>,
max_read_size: usize,
}
impl VectoredReadPlanner {
pub fn new(max_read_size: usize) -> Self {
Self {
blobs: BTreeMap::new(),
prev: None,
max_read_size,
}
}
/// Include a new blob in the read plan.
///
/// Notes:
/// * This function should be called for each blob in the desired *inclusive* range.
/// See `DeltaLayerInner::plan_reads` and `ImageLayerInner::plan_reads`.
/// * Calls to this function should be for monotonically continuous (key, lsn) tuples.
///
/// The `flag` argument has two interesting values:
/// * [`BlobFlag::Replaces`]: The blob for this key should replace all existing blobs.
/// This is used for WAL records that `will_init`.
/// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
/// if the blob is cached.
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
// Implementation note: internally lag behind by one blob such that
// we have a start and end offset when initialising [`VectoredRead`]
let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
None => {
self.prev = Some((key, lsn, offset, flag));
return;
}
Some(prev) => prev,
};
self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
self.prev = Some((key, lsn, offset, flag));
}
pub fn handle_range_end(&mut self, offset: u64) {
if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
}
self.prev = None;
}
fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
match flag {
BlobFlag::None => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.push((lsn, start_offset, end_offset));
}
BlobFlag::Replaces => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.clear();
blobs_for_key.push((lsn, start_offset, end_offset));
}
BlobFlag::Ignore => {}
}
}
pub fn finish(self) -> Vec<VectoredRead> {
let mut current_read: Option<VectoredRead> = None;
let mut reads = Vec::new();
for (key, blobs_for_key) in self.blobs {
for (lsn, start_offset, end_offset) in blobs_for_key {
let extended = match &mut current_read {
Some(read) => read.extend(start_offset, end_offset, BlobMeta { key, lsn }),
None => VectoredReadExtended::No,
};
if extended == VectoredReadExtended::No {
let next_read = VectoredRead::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
self.max_read_size,
);
let prev_read = current_read.replace(next_read);
if let Some(read) = prev_read {
reads.push(read);
}
}
}
}
if let Some(read) = current_read {
reads.push(read);
}
reads
}
}
/// Disk reader for vectored blob spans (does not go through the page cache)
pub struct VectoredBlobReader {
file: VirtualFile,
max_vectored_read_size: usize,
}
impl VectoredBlobReader {
pub fn new(file: VirtualFile, max_vectored_read_size: usize) -> Self {
Self {
file,
max_vectored_read_size,
}
}
pub fn get_max_read_size(&self) -> usize {
self.max_vectored_read_size
}
pub fn get_file_ref(&self) -> &VirtualFile {
&self.file
}
/// Read the requested blobs into the buffer.
///
/// We have to deal with the fact that blobs are not fixed size.
/// Each blob is prefixed by a size header.
///
/// The success return value is a struct which contains the buffer
/// filled from disk and a list of offsets at which each blob lies
/// in the buffer.
pub async fn read_blobs(
&self,
read: &VectoredRead,
buf: BytesMut,
) -> Result<VectoredBlobsBuf, std::io::Error> {
// tracing::info!("read_blobs(read={read:?}, read_size={})", read.size());
assert!(read.size() > 0);
assert!(
read.size() <= buf.capacity(),
"{} > {}",
read.size(),
buf.capacity()
);
let buf = self
.file
.read_exact_at_n(buf, read.start, read.size())
.await?;
let blobs_at = read.blobs_at.as_slice();
let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
let mut metas = Vec::new();
let pairs = blobs_at.iter().zip(
blobs_at
.iter()
.map(Some)
.skip(1)
.chain(std::iter::once(None)),
);
for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];
// Each blob is prefixed by a header containing it's size.
// Extract the size and skip that header to find the start of the data.
let (size_length, blob_size) = if first_len_byte < 0x80 {
(1, first_len_byte as u64)
} else {
let mut blob_size_buf = [0u8; 4];
let offset_in_buf = offset_in_buf as usize;
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= 0x7f;
(4, u32::from_be_bytes(blob_size_buf) as u64)
};
let start = offset_in_buf + size_length;
let end = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
None => start + blob_size,
};
assert_eq!(end - start, blob_size);
metas.push(VectoredBlob {
start: start as usize,
end: end as usize,
meta: *meta,
})
}
Ok(VectoredBlobsBuf { buf, blobs: metas })
}
}
#[cfg(test)]
mod tests {
use super::*;
fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
assert_eq!(read.start, offset_range.first().unwrap().2);
let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
let offsets_in_read: Vec<_> = read
.blobs_at
.as_slice()
.iter()
.map(|(offset, _)| *offset)
.collect();
assert_eq!(expected_offsets_in_read, offsets_in_read);
}
#[test]
fn planner_max_read_size_test() {
let max_read_size = 128 * 1024;
let key = Key::MIN;
let lsn = Lsn(0);
let blob_descriptions = vec![
(key, lsn, 0, BlobFlag::None),
(key, lsn, 32 * 1024, BlobFlag::None),
(key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
(key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
(key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
(key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
(key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
(key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
];
let ranges = [
&blob_descriptions[0..3],
&blob_descriptions[3..4],
&blob_descriptions[4..5],
&blob_descriptions[5..6],
&blob_descriptions[6..7],
&blob_descriptions[7..],
];
let mut planner = VectoredReadPlanner::new(max_read_size);
for (key, lsn, offset, flag) in blob_descriptions.clone() {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(652 * 1024);
let reads = planner.finish();
assert_eq!(reads.len(), 6);
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
#[test]
fn planner_replacement_test() {
let max_read_size = 128 * 1024;
let first_key = Key::MIN;
let second_key = first_key.next();
let lsn = Lsn(0);
let blob_descriptions = vec![
(first_key, lsn, 0, BlobFlag::None), // First in read 1
(first_key, lsn, 1024, BlobFlag::None), // Last in read 1
(second_key, lsn, 2 * 1024, BlobFlag::Replaces),
(second_key, lsn, 3 * 1024, BlobFlag::None),
(second_key, lsn, 4 * 1024, BlobFlag::Replaces), // First in read 2
(second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
];
let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
let mut planner = VectoredReadPlanner::new(max_read_size);
for (key, lsn, offset, flag) in blob_descriptions.clone() {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(6 * 1024);
let reads = planner.finish();
assert_eq!(reads.len(), 2);
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
}

View File

@@ -562,7 +562,18 @@ impl VirtualFile {
B: IoBufMut + Send,
{
let (buf, res) =
read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await;
read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await;
res.map(|()| buf)
}
pub async fn read_exact_at_n<B>(&self, buf: B, offset: u64, count: usize) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
self.read_at(buf, offset)
})
.await;
res.map(|()| buf)
}
@@ -696,6 +707,7 @@ impl VirtualFile {
pub async fn read_exact_at_impl<B, F, Fut>(
buf: B,
mut offset: u64,
count: Option<usize>,
mut read_at: F,
) -> (B, std::io::Result<()>)
where
@@ -703,7 +715,15 @@ where
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
let mut buf: tokio_epoll_uring::Slice<B> = match count {
Some(count) => {
assert!(count <= buf.bytes_total());
assert!(count > 0);
buf.slice(..count) // may include uninitialized memory
}
None => buf.slice_full(), // includes all the uninitialized memory
};
while buf.bytes_total() != 0 {
let res;
(buf, res) = read_at(buf, offset).await;
@@ -793,7 +813,7 @@ mod test_read_exact_at_impl {
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -802,13 +822,33 @@ mod test_read_exact_at_impl {
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
}
#[tokio::test]
async fn test_with_count() {
let buf = Vec::with_capacity(5);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![Expectation {
offset: 0,
bytes_total: 3,
result: Ok(vec![b'a', b'b', b'c']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
assert!(res.is_ok());
assert_eq!(buf, vec![b'a', b'b', b'c']);
}
#[tokio::test]
async fn test_empty_buf_issues_no_syscall() {
let buf = Vec::new();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::new(),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -833,7 +873,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -864,7 +904,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})

190
poetry.lock generated
View File

@@ -158,6 +158,28 @@ files = [
attrs = ">=16.0.0"
pluggy = ">=0.4.0"
[[package]]
name = "anyio"
version = "4.3.0"
description = "High level compatibility layer for multiple asynchronous event loop implementations"
optional = false
python-versions = ">=3.8"
files = [
{file = "anyio-4.3.0-py3-none-any.whl", hash = "sha256:048e05d0f6caeed70d731f3db756d35dcc1f35747c8c403364a8332c630441b8"},
{file = "anyio-4.3.0.tar.gz", hash = "sha256:f75253795a87df48568485fd18cdd2a3fa5c4f7c5be8e5e36637733fce06fed6"},
]
[package.dependencies]
exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""}
idna = ">=2.8"
sniffio = ">=1.1"
typing-extensions = {version = ">=4.1", markers = "python_version < \"3.11\""}
[package.extras]
doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"]
test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"]
trio = ["trio (>=0.23)"]
[[package]]
name = "async-timeout"
version = "4.0.3"
@@ -1073,6 +1095,100 @@ files = [
{file = "graphql_core-3.2.1-py3-none-any.whl", hash = "sha256:f83c658e4968998eed1923a2e3e3eddd347e005ac0315fbb7ca4d70ea9156323"},
]
[[package]]
name = "h11"
version = "0.14.0"
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
optional = false
python-versions = ">=3.7"
files = [
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
]
[[package]]
name = "h2"
version = "4.1.0"
description = "HTTP/2 State-Machine based protocol implementation"
optional = false
python-versions = ">=3.6.1"
files = [
{file = "h2-4.1.0-py3-none-any.whl", hash = "sha256:03a46bcf682256c95b5fd9e9a99c1323584c3eec6440d379b9903d709476bc6d"},
{file = "h2-4.1.0.tar.gz", hash = "sha256:a83aca08fbe7aacb79fec788c9c0bac936343560ed9ec18b82a13a12c28d2abb"},
]
[package.dependencies]
hpack = ">=4.0,<5"
hyperframe = ">=6.0,<7"
[[package]]
name = "hpack"
version = "4.0.0"
description = "Pure-Python HPACK header compression"
optional = false
python-versions = ">=3.6.1"
files = [
{file = "hpack-4.0.0-py3-none-any.whl", hash = "sha256:84a076fad3dc9a9f8063ccb8041ef100867b1878b25ef0ee63847a5d53818a6c"},
{file = "hpack-4.0.0.tar.gz", hash = "sha256:fc41de0c63e687ebffde81187a948221294896f6bdc0ae2312708df339430095"},
]
[[package]]
name = "httpcore"
version = "1.0.3"
description = "A minimal low-level HTTP client."
optional = false
python-versions = ">=3.8"
files = [
{file = "httpcore-1.0.3-py3-none-any.whl", hash = "sha256:9a6a501c3099307d9fd76ac244e08503427679b1e81ceb1d922485e2f2462ad2"},
{file = "httpcore-1.0.3.tar.gz", hash = "sha256:5c0f9546ad17dac4d0772b0808856eb616eb8b48ce94f49ed819fd6982a8a544"},
]
[package.dependencies]
certifi = "*"
h11 = ">=0.13,<0.15"
[package.extras]
asyncio = ["anyio (>=4.0,<5.0)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
trio = ["trio (>=0.22.0,<0.24.0)"]
[[package]]
name = "httpx"
version = "0.26.0"
description = "The next generation HTTP client."
optional = false
python-versions = ">=3.8"
files = [
{file = "httpx-0.26.0-py3-none-any.whl", hash = "sha256:8915f5a3627c4d47b73e8202457cb28f1266982d1159bd5779d86a80c0eab1cd"},
{file = "httpx-0.26.0.tar.gz", hash = "sha256:451b55c30d5185ea6b23c2c793abf9bb237d2a7dfb901ced6ff69ad37ec1dfaf"},
]
[package.dependencies]
anyio = "*"
certifi = "*"
h2 = {version = ">=3,<5", optional = true, markers = "extra == \"http2\""}
httpcore = "==1.*"
idna = "*"
sniffio = "*"
[package.extras]
brotli = ["brotli", "brotlicffi"]
cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
[[package]]
name = "hyperframe"
version = "6.0.1"
description = "HTTP/2 framing layer for Python"
optional = false
python-versions = ">=3.6.1"
files = [
{file = "hyperframe-6.0.1-py3-none-any.whl", hash = "sha256:0ec6bafd80d8ad2195c4f03aacba3a8265e57bc4cff261e802bf39970ed02a15"},
{file = "hyperframe-6.0.1.tar.gz", hash = "sha256:ae510046231dc8e9ecb1a6586f63d2347bf4c8905914aa84ba585ae85f28a914"},
]
[[package]]
name = "idna"
version = "3.3"
@@ -1909,6 +2025,20 @@ pytest = [
{version = ">=6.2.4", markers = "python_version >= \"3.10\""},
]
[[package]]
name = "pytest-repeat"
version = "0.9.3"
description = "pytest plugin for repeating tests"
optional = false
python-versions = ">=3.7"
files = [
{file = "pytest_repeat-0.9.3-py3-none-any.whl", hash = "sha256:26ab2df18226af9d5ce441c858f273121e92ff55f5bb311d25755b8d7abdd8ed"},
{file = "pytest_repeat-0.9.3.tar.gz", hash = "sha256:ffd3836dfcd67bb270bec648b330e20be37d2966448c4148c4092d1e8aba8185"},
]
[package.dependencies]
pytest = "*"
[[package]]
name = "pytest-rerunfailures"
version = "13.0"
@@ -2052,7 +2182,6 @@ files = [
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
@@ -2142,28 +2271,28 @@ pyasn1 = ">=0.1.3"
[[package]]
name = "ruff"
version = "0.1.11"
version = "0.2.2"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
files = [
{file = "ruff-0.1.11-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:a7f772696b4cdc0a3b2e527fc3c7ccc41cdcb98f5c80fdd4f2b8c50eb1458196"},
{file = "ruff-0.1.11-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:934832f6ed9b34a7d5feea58972635c2039c7a3b434fe5ba2ce015064cb6e955"},
{file = "ruff-0.1.11-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ea0d3e950e394c4b332bcdd112aa566010a9f9c95814844a7468325290aabfd9"},
{file = "ruff-0.1.11-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9bd4025b9c5b429a48280785a2b71d479798a69f5c2919e7d274c5f4b32c3607"},
{file = "ruff-0.1.11-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e1ad00662305dcb1e987f5ec214d31f7d6a062cae3e74c1cbccef15afd96611d"},
{file = "ruff-0.1.11-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4b077ce83f47dd6bea1991af08b140e8b8339f0ba8cb9b7a484c30ebab18a23f"},
{file = "ruff-0.1.11-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c4a88efecec23c37b11076fe676e15c6cdb1271a38f2b415e381e87fe4517f18"},
{file = "ruff-0.1.11-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:5b25093dad3b055667730a9b491129c42d45e11cdb7043b702e97125bcec48a1"},
{file = "ruff-0.1.11-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:231d8fb11b2cc7c0366a326a66dafc6ad449d7fcdbc268497ee47e1334f66f77"},
{file = "ruff-0.1.11-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:09c415716884950080921dd6237767e52e227e397e2008e2bed410117679975b"},
{file = "ruff-0.1.11-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:0f58948c6d212a6b8d41cd59e349751018797ce1727f961c2fa755ad6208ba45"},
{file = "ruff-0.1.11-py3-none-musllinux_1_2_i686.whl", hash = "sha256:190a566c8f766c37074d99640cd9ca3da11d8deae2deae7c9505e68a4a30f740"},
{file = "ruff-0.1.11-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6464289bd67b2344d2a5d9158d5eb81025258f169e69a46b741b396ffb0cda95"},
{file = "ruff-0.1.11-py3-none-win32.whl", hash = "sha256:9b8f397902f92bc2e70fb6bebfa2139008dc72ae5177e66c383fa5426cb0bf2c"},
{file = "ruff-0.1.11-py3-none-win_amd64.whl", hash = "sha256:eb85ee287b11f901037a6683b2374bb0ec82928c5cbc984f575d0437979c521a"},
{file = "ruff-0.1.11-py3-none-win_arm64.whl", hash = "sha256:97ce4d752f964ba559c7023a86e5f8e97f026d511e48013987623915431c7ea9"},
{file = "ruff-0.1.11.tar.gz", hash = "sha256:f9d4d88cb6eeb4dfe20f9f0519bd2eaba8119bde87c3d5065c541dbae2b5a2cb"},
{file = "ruff-0.2.2-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:0a9efb032855ffb3c21f6405751d5e147b0c6b631e3ca3f6b20f917572b97eb6"},
{file = "ruff-0.2.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:d450b7fbff85913f866a5384d8912710936e2b96da74541c82c1b458472ddb39"},
{file = "ruff-0.2.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ecd46e3106850a5c26aee114e562c329f9a1fbe9e4821b008c4404f64ff9ce73"},
{file = "ruff-0.2.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:5e22676a5b875bd72acd3d11d5fa9075d3a5f53b877fe7b4793e4673499318ba"},
{file = "ruff-0.2.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1695700d1e25a99d28f7a1636d85bafcc5030bba9d0578c0781ba1790dbcf51c"},
{file = "ruff-0.2.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:b0c232af3d0bd8f521806223723456ffebf8e323bd1e4e82b0befb20ba18388e"},
{file = "ruff-0.2.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f63d96494eeec2fc70d909393bcd76c69f35334cdbd9e20d089fb3f0640216ca"},
{file = "ruff-0.2.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6a61ea0ff048e06de273b2e45bd72629f470f5da8f71daf09fe481278b175001"},
{file = "ruff-0.2.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5e1439c8f407e4f356470e54cdecdca1bd5439a0673792dbe34a2b0a551a2fe3"},
{file = "ruff-0.2.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:940de32dc8853eba0f67f7198b3e79bc6ba95c2edbfdfac2144c8235114d6726"},
{file = "ruff-0.2.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:0c126da55c38dd917621552ab430213bdb3273bb10ddb67bc4b761989210eb6e"},
{file = "ruff-0.2.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:3b65494f7e4bed2e74110dac1f0d17dc8e1f42faaa784e7c58a98e335ec83d7e"},
{file = "ruff-0.2.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1ec49be4fe6ddac0503833f3ed8930528e26d1e60ad35c2446da372d16651ce9"},
{file = "ruff-0.2.2-py3-none-win32.whl", hash = "sha256:d920499b576f6c68295bc04e7b17b6544d9d05f196bb3aac4358792ef6f34325"},
{file = "ruff-0.2.2-py3-none-win_amd64.whl", hash = "sha256:cc9a91ae137d687f43a44c900e5d95e9617cb37d4c989e462980ba27039d239d"},
{file = "ruff-0.2.2-py3-none-win_arm64.whl", hash = "sha256:c9d15fc41e6054bfc7200478720570078f0b41c9ae4f010bcc16bd6f4d1aacdd"},
{file = "ruff-0.2.2.tar.gz", hash = "sha256:e62ed7f36b3068a30ba39193a14274cd706bc486fad521276458022f7bccb31d"},
]
[[package]]
@@ -2225,6 +2354,17 @@ files = [
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
]
[[package]]
name = "sniffio"
version = "1.3.0"
description = "Sniff out which async library your code is running under"
optional = false
python-versions = ">=3.7"
files = [
{file = "sniffio-1.3.0-py3-none-any.whl", hash = "sha256:eecefdce1e5bbfb7ad2eeaabf7c1eeb404d7757c379bd1f7e5cce9d8bf425384"},
{file = "sniffio-1.3.0.tar.gz", hash = "sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101"},
]
[[package]]
name = "sshpubkeys"
version = "3.3.1"
@@ -2431,16 +2571,6 @@ files = [
{file = "wrapt-1.14.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8ad85f7f4e20964db4daadcab70b47ab05c7c1cf2a7c1e51087bfaa83831854c"},
{file = "wrapt-1.14.1-cp310-cp310-win32.whl", hash = "sha256:a9a52172be0b5aae932bef82a79ec0a0ce87288c7d132946d645eba03f0ad8a8"},
{file = "wrapt-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:6d323e1554b3d22cfc03cd3243b5bb815a51f5249fdcbb86fda4bf62bab9e164"},
{file = "wrapt-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ecee4132c6cd2ce5308e21672015ddfed1ff975ad0ac8d27168ea82e71413f55"},
{file = "wrapt-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2020f391008ef874c6d9e208b24f28e31bcb85ccff4f335f15a3251d222b92d9"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2feecf86e1f7a86517cab34ae6c2f081fd2d0dac860cb0c0ded96d799d20b335"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:240b1686f38ae665d1b15475966fe0472f78e71b1b4903c143a842659c8e4cb9"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9008dad07d71f68487c91e96579c8567c98ca4c3881b9b113bc7b33e9fd78b8"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6447e9f3ba72f8e2b985a1da758767698efa72723d5b59accefd716e9e8272bf"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:acae32e13a4153809db37405f5eba5bac5fbe2e2ba61ab227926a22901051c0a"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49ef582b7a1152ae2766557f0550a9fcbf7bbd76f43fbdc94dd3bf07cc7168be"},
{file = "wrapt-1.14.1-cp311-cp311-win32.whl", hash = "sha256:358fe87cc899c6bb0ddc185bf3dbfa4ba646f05b1b0b9b5a27c2cb92c2cea204"},
{file = "wrapt-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:26046cd03936ae745a502abf44dac702a5e6880b2b01c29aea8ddf3353b68224"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43ca3bbbe97af00f49efb06e352eae40434ca9d915906f77def219b88e85d907"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:6b1a564e6cb69922c7fe3a678b9f9a3c54e72b469875aa8018f18b4d1dd1adf3"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:00b6d4ea20a906c0ca56d84f93065b398ab74b927a7a3dbd470f6fc503f95dc3"},
@@ -2678,4 +2808,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "e99954cbbfef8dcc5e13cea7103c87657639a192f2372983bdb8c5d624c2e447"
content-hash = "af9d5b45310c12411bfe67cb9677d2236808d0780ca1bd81525d2763a928f7f9"

View File

@@ -171,16 +171,8 @@ async fn task_main(
.context("failed to set socket option")?;
info!(%peer_addr, "serving");
let mut ctx =
RequestMonitoring::new(session_id, peer_addr.ip(), "sni_router", "sni");
handle_client(
&mut ctx,
dest_suffix,
tls_config,
tls_server_end_point,
socket,
)
.await
let ctx = RequestMonitoring::new(session_id, peer_addr.ip(), "sni_router", "sni");
handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
@@ -248,7 +240,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
async fn handle_client(
ctx: &mut RequestMonitoring,
mut ctx: RequestMonitoring,
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint,

View File

@@ -87,6 +87,22 @@ pub mod errors {
impl ReportableError for ApiError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ApiError::Console {
status: http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ApiError::Console {
status: http::StatusCode::LOCKED,
text,
} if text.contains("quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ApiError::Console {
status: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ApiError::Console { .. } => crate::error::ErrorKind::ControlPlane,
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
}
@@ -222,7 +238,7 @@ pub mod errors {
match self {
WakeComputeError::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
WakeComputeError::ApiError(e) => e.get_error_kind(),
WakeComputeError::TimeoutError => crate::error::ErrorKind::RateLimit,
WakeComputeError::TimeoutError => crate::error::ErrorKind::ServiceRateLimit,
}
}
}

View File

@@ -147,15 +147,13 @@ impl RequestMonitoring {
self.success = true;
}
pub fn log(&mut self) {
pub fn log(self) {}
}
impl Drop for RequestMonitoring {
fn drop(&mut self) {
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(self.clone());
}
}
}
impl Drop for RequestMonitoring {
fn drop(&mut self) {
self.log()
}
}

View File

@@ -37,9 +37,12 @@ pub enum ErrorKind {
/// Network error between user and proxy. Not necessarily user error
ClientDisconnect,
/// Proxy self-imposed rate limits
/// Proxy self-imposed user rate limits
RateLimit,
/// Proxy self-imposed service-wise rate limits
ServiceRateLimit,
/// internal errors
Service,
@@ -54,25 +57,12 @@ pub enum ErrorKind {
}
impl ErrorKind {
pub fn to_str(&self) -> &'static str {
match self {
ErrorKind::User => "request failed due to user error",
ErrorKind::ClientDisconnect => "client disconnected",
ErrorKind::RateLimit => "request cancelled due to rate limit",
ErrorKind::Service => "internal service error",
ErrorKind::ControlPlane => "non-retryable control plane error",
ErrorKind::Postgres => "postgres error",
ErrorKind::Compute => {
"non-retryable compute connection error (or exhausted retry capacity)"
}
}
}
pub fn to_metric_label(&self) -> &'static str {
match self {
ErrorKind::User => "user",
ErrorKind::ClientDisconnect => "clientdisconnect",
ErrorKind::RateLimit => "ratelimit",
ErrorKind::ServiceRateLimit => "serviceratelimit",
ErrorKind::Service => "service",
ErrorKind::ControlPlane => "controlplane",
ErrorKind::Postgres => "postgres",
@@ -85,12 +75,6 @@ pub trait ReportableError: fmt::Display + Send + 'static {
fn get_error_kind(&self) -> ErrorKind;
}
impl ReportableError for tokio::time::error::Elapsed {
fn get_error_kind(&self) -> ErrorKind {
ErrorKind::RateLimit
}
}
impl ReportableError for tokio_postgres::error::Error {
fn get_error_kind(&self) -> ErrorKind {
if self.as_db_error().is_some() {

View File

@@ -88,7 +88,10 @@ pub async fn task_main(
return Ok(());
}
};
let tls_acceptor: tokio_rustls::TlsAcceptor = tls_config.to_server_config().into();
let mut tls_server_config = rustls::ServerConfig::clone(&tls_config.to_server_config());
// prefer http2, but support http/1.1
tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
let tls_acceptor: tokio_rustls::TlsAcceptor = Arc::new(tls_server_config).into();
let mut addr_incoming = AddrIncoming::from_listener(ws_listener)?;
let _ = addr_incoming.set_nodelay(true);

View File

@@ -12,7 +12,7 @@ use hyper::StatusCode;
use hyper::{Body, HeaderMap, Request};
use serde_json::json;
use serde_json::Value;
use tokio::join;
use tokio::try_join;
use tokio_postgres::error::DbError;
use tokio_postgres::error::ErrorPosition;
use tokio_postgres::GenericClient;
@@ -32,11 +32,9 @@ use crate::auth::ComputeUserInfoParseError;
use crate::config::ProxyConfig;
use crate::config::TlsConfig;
use crate::context::RequestMonitoring;
use crate::error::ReportableError;
use crate::metrics::HTTP_CONTENT_LENGTH;
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::DbName;
use crate::RoleName;
@@ -166,9 +164,12 @@ fn get_conn_info(
let mut options = Option::None;
for (key, value) in pairs {
if key == "options" {
options = Some(NeonOptions::parse_options_raw(&value));
break;
match &*key {
"options" => {
options = Some(NeonOptions::parse_options_raw(&value));
}
"application_name" => ctx.set_application(Some(value.into())),
_ => {}
}
}
@@ -284,8 +285,10 @@ pub async fn handle(
)?
}
},
Err(e) => {
ctx.set_error_kind(e.get_error_kind());
Err(_) => {
// TODO: when http error classification is done, distinguish between
// timeout on sql vs timeout in proxy/cplane
// ctx.set_error_kind(crate::error::ErrorKind::RateLimit);
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
@@ -399,16 +402,11 @@ async fn handle_inner(
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.latency_timer.success();
Ok::<_, HttpConnError>(client)
Ok::<_, anyhow::Error>(client)
};
// Run both operations in parallel
let (payload_result, auth_and_connect_result) =
join!(fetch_and_process_request, authenticate_and_connect,);
// Handle the results
let payload = payload_result?; // Handle errors appropriately
let mut client = auth_and_connect_result?; // Handle errors appropriately
let (payload, mut client) = try_join!(fetch_and_process_request, authenticate_and_connect)?;
let mut response = Response::builder()
.status(StatusCode::OK)

View File

@@ -38,17 +38,22 @@ pytest-rerunfailures = "^13.0"
types-pytest-lazy-fixture = "^0.6.3.3"
pytest-split = "^0.8.1"
zstandard = "^0.21.0"
httpx = {extras = ["http2"], version = "^0.26.0"}
pytest-repeat = "^0.9.3"
[tool.poetry.group.dev.dependencies]
mypy = "==1.3.0"
ruff = "^0.1.11"
ruff = "^0.2.2"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.mypy]
exclude = "^vendor/"
exclude = [
"^vendor/",
"^target/",
]
check_untyped_defs = true
# Help mypy find imports when running against list of individual files.
# Without this line it would behave differently when executed on the entire project.
@@ -72,7 +77,13 @@ ignore_missing_imports = true
[tool.ruff]
target-version = "py39"
extend-exclude = ["vendor/"]
extend-exclude = [
"vendor/",
"target/",
]
line-length = 100 # this setting is rather guidance, it won't fail if it can't make the shorter
[tool.ruff.lint]
ignore = [
"E501", # Line too long, we don't want to be too strict about it
]
@@ -83,4 +94,3 @@ select = [
"W", # pycodestyle
"B", # bugbear
]
line-length = 100 # this setting is rather guidance, it won't fail if it can't make the shorter

View File

@@ -695,9 +695,11 @@ impl Collector for TimelineCollector {
// report total number of timelines
self.timelines_count.set(timelines_count as i64);
mfs.extend(self.timelines_count.collect());
self.active_timelines_count
.set(active_timelines_count as i64);
mfs.extend(self.timelines_count.collect());
mfs.extend(self.active_timelines_count.collect());
mfs
}

View File

@@ -54,7 +54,7 @@ class MetricsGetter:
return results[0].value
def get_metrics_values(
self, names: list[str], filter: Optional[Dict[str, str]] = None
self, names: list[str], filter: Optional[Dict[str, str]] = None, absence_ok=False
) -> Dict[str, float]:
"""
When fetching multiple named metrics, it is more efficient to use this
@@ -63,6 +63,10 @@ class MetricsGetter:
Throws RuntimeError if no metrics matching `names` are found, or if
not all of `names` are found: this method is intended for loading sets
of metrics whose existence is coupled.
If it's expected that there may be no results for some of the metrics,
specify `absence_ok=True`. The returned dict will then not contain values
for these metrics.
"""
metrics = self.get_metrics()
samples = []
@@ -75,9 +79,10 @@ class MetricsGetter:
raise RuntimeError(f"Multiple values found for {sample.name}")
result[sample.name] = sample.value
if len(result) != len(names):
log.info(f"Metrics found: {metrics.metrics}")
raise RuntimeError(f"could not find all metrics {' '.join(names)}")
if not absence_ok:
if len(result) != len(names):
log.info(f"Metrics found: {metrics.metrics}")
raise RuntimeError(f"could not find all metrics {' '.join(names)}")
return result
@@ -98,7 +103,8 @@ def histogram(prefix_without_trailing_underscore: str) -> List[str]:
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
"pageserver_remote_timeline_client_calls_unfinished",
"pageserver_remote_timeline_client_calls_started_total",
"pageserver_remote_timeline_client_calls_finished_total",
"pageserver_remote_physical_size",
"pageserver_remote_timeline_client_bytes_started_total",
"pageserver_remote_timeline_client_bytes_finished_total",
@@ -127,7 +133,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_operation_seconds"),
*histogram("pageserver_remote_timeline_client_calls_started"),
*histogram("pageserver_io_operations_seconds"),
"pageserver_tenant_states_count",
)

View File

@@ -27,6 +27,7 @@ from urllib.parse import quote, urlparse
import asyncpg
import backoff
import httpx
import jwt
import psycopg2
import pytest
@@ -487,6 +488,11 @@ class NeonEnvBuilder:
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_impl: Optional[str] = None
if os.getenv("PAGESERVER_GET_VECTORED_IMPL", "") == "vectored":
self.pageserver_get_vectored_impl = "vectored"
log.debug('Overriding pageserver get_vectored_impl config to "vectored"')
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -1054,6 +1060,8 @@ class NeonEnv:
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if config.pageserver_get_vectored_impl is not None:
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
# Create a corresponding NeonPageserver object
self.pageservers.append(
@@ -2856,9 +2864,34 @@ class NeonProxy(PgProtocol):
)
if expected_code is not None:
assert response.status_code == kwargs["expected_code"], f"response: {response.json()}"
assert response.status_code == expected_code, f"response: {response.json()}"
return response.json()
async def http2_query(self, query, args, **kwargs):
# TODO maybe use default values if not provided
user = kwargs["user"]
password = kwargs["password"]
expected_code = kwargs.get("expected_code")
connstr = f"postgresql://{user}:{password}@{self.domain}:{self.proxy_port}/postgres"
async with httpx.AsyncClient(
http2=True, verify=str(self.test_output_dir / "proxy.crt")
) as client:
response = await client.post(
f"https://{self.domain}:{self.external_http_port}/sql",
json={"query": query, "params": args},
headers={
"Content-Type": "application/sql",
"Neon-Connection-String": connstr,
"Neon-Pool-Opt-In": "true",
},
)
assert response.http_version == "HTTP/2"
if expected_code is not None:
assert response.status_code == expected_code, f"response: {response.json()}"
return response.json()
def get_metrics(self) -> str:
request_result = requests.get(f"http://{self.host}:{self.http_port}/metrics")
request_result.raise_for_status()

View File

@@ -694,32 +694,33 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
},
).value
def get_remote_timeline_client_metric(
def get_remote_timeline_client_queue_count(
self,
metric_name: str,
tenant_id: TenantId,
timeline_id: TimelineId,
file_kind: str,
op_kind: str,
) -> Optional[float]:
metrics = self.get_metrics()
matches = metrics.query_all(
name=metric_name,
) -> Optional[int]:
metrics = [
"pageserver_remote_timeline_client_calls_started_total",
"pageserver_remote_timeline_client_calls_finished_total",
]
res = self.get_metrics_values(
metrics,
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
absence_ok=True,
)
if len(matches) == 0:
value = None
elif len(matches) == 1:
value = matches[0].value
assert value is not None
else:
assert len(matches) < 2, "above filter should uniquely identify metric"
return value
if len(res) != 2:
return None
inc, dec = [res[metric] for metric in metrics]
queue_count = int(inc) - int(dec)
assert queue_count >= 0
return queue_count
def layer_map_info(
self,

View File

@@ -1,5 +1,5 @@
import time
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from mypy_boto3_s3.type_defs import (
DeleteObjectOutputTypeDef,
@@ -221,16 +221,40 @@ def wait_for_upload_queue_empty(
):
while True:
all_metrics = pageserver_http.get_metrics()
tl = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_unfinished",
started = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_started_total",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
assert len(tl) > 0
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
if all(m.value == 0 for m in tl):
finished = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_finished_total",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
assert len(started) == len(finished)
# this is `started left join finished`; if match, subtracting start from finished, resulting in queue depth
remaining_labels = ["shard_id", "file_kind", "op_kind"]
tl: List[Tuple[Any, float]] = []
for s in started:
found = False
for f in finished:
if all([s.labels[label] == f.labels[label] for label in remaining_labels]):
assert (
not found
), "duplicate match, remaining_labels don't uniquely identify sample"
tl.append((s.labels, int(s.value) - int(f.value)))
found = True
if not found:
tl.append((s.labels, int(s.value)))
assert len(tl) == len(started), "something broken with join logic"
log.info(f"upload queue for {tenant_id}/{timeline_id}:")
for labels, queue_count in tl:
log.info(f" {labels}: {queue_count}")
if all(queue_count == 0 for (_, queue_count) in tl):
return
time.sleep(0.2)

View File

@@ -73,7 +73,7 @@ class Workload:
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def write_rows(self, n, pageserver_id: Optional[int] = None):
def write_rows(self, n, pageserver_id: Optional[int] = None, upload: bool = True):
endpoint = self.endpoint(pageserver_id)
start = self.expect_rows
end = start + n - 1
@@ -87,9 +87,12 @@ class Workload:
"""
)
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
if upload:
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
else:
return False
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
assert self.expect_rows >= n

View File

@@ -0,0 +1,195 @@
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, Tuple
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.utils import get_scale_for_db, humantime_to_ms
from performance.pageserver.util import (
setup_pageserver_with_tenants,
)
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(200)])
@pytest.mark.parametrize("n_tenants", [10])
@pytest.mark.parametrize("get_vectored_impl", ["sequential", "vectored"])
@pytest.mark.timeout(1000)
def test_basebackup_with_high_slru_count(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
get_vectored_impl: str,
n_tenants: int,
pgbench_scale: int,
duration: int,
):
def record(metric, **kwargs):
zenbenchmark.record(metric_name=f"pageserver_basebackup.{metric}", **kwargs)
params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}
# params from fixtures
params.update(
{
"n_tenants": (n_tenants, {"unit": ""}),
"pgbench_scale": (pgbench_scale, {"unit": ""}),
"duration": (duration, {"unit": "s"}),
}
)
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}; "
f"get_vectored_impl='{get_vectored_impl}'; validate_vectored_get=false"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
n_txns = 500000
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, n_txns)
env = setup_pageserver_with_tenants(
neon_env_builder, f"large_slru_count-{n_tenants}-{n_txns}", n_tenants, setup_wrapper
)
run_benchmark(env, pg_bin, record, duration)
def setup_tenant_template(env: NeonEnv, n_txns: int):
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start(
"main", tenant_id=template_tenant, config_lines=["shared_buffers=1MB"]
) as ep:
rels = 10
asyncio.run(run_updates(ep, n_txns, rels))
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
return (template_tenant, template_timeline, config)
# Takes about 5 minutes and produces tenants with around 300 SLRU blocks
# of 8 KiB each.
async def run_updates(ep: Endpoint, n_txns: int, workers_count: int):
workers = []
for i in range(workers_count):
workers.append(asyncio.create_task(run_update_loop_worker(ep, n_txns, i)))
await asyncio.gather(*workers)
async def run_update_loop_worker(ep: Endpoint, n_txns: int, idx: int):
table = f"t_{idx}"
conn = await ep.connect_async()
await conn.execute(f"CREATE TABLE {table} (pk integer PRIMARY KEY, x integer)")
await conn.execute(f"ALTER TABLE {table} SET (autovacuum_enabled = false)")
await conn.execute(f"INSERT INTO {table} VALUES (1, 0)")
await conn.execute(
"""
CREATE PROCEDURE updating{0}() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..{1} LOOP
UPDATE {0} SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
""".format(table, n_txns)
)
await conn.execute("SET statement_timeout=0")
await conn.execute(f"call updating{table}()")
def run_benchmark(env: NeonEnv, pg_bin: PgBin, record, duration_secs: int):
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"basebackup",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--gzip-probability",
"1",
"--runtime",
f"{duration_secs}s",
# don't specify the targets explicitly, let pagebench auto-discover them
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
total = results["total"]
metric = "request_count"
record(
metric,
metric_value=total[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "latency_mean"
record(
metric,
metric_value=humantime_to_ms(total[metric]),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "latency_percentiles"
for k, v in total[metric].items():
record(
f"{metric}.{k}",
metric_value=humantime_to_ms(v),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)

View File

@@ -3,7 +3,6 @@ import os
from pathlib import Path
from typing import Any, Dict, Tuple
import fixtures.pageserver.many_tenants as many_tenants
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
@@ -15,7 +14,9 @@ from fixtures.neon_fixtures import (
)
from fixtures.utils import get_scale_for_db, humantime_to_ms
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
from performance.pageserver.util import (
setup_pageserver_with_tenants,
)
# For reference, the space usage of the snapshots:
@@ -80,10 +81,72 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, pg_bin, pgbench_scale)
env = setup_pageserver_with_tenants(
neon_env_builder,
f"max_throughput_latest_lsn-{n_tenants}-{pgbench_scale}",
n_tenants,
setup_wrapper,
)
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
# use a config that makes production of on-disk state timing-insensitive
# as we ingest data into the tenant.
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
return (template_tenant, template_timeline, config)
def run_benchmark_max_throughput_latest_lsn(
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int
):
@@ -138,78 +201,3 @@ def run_benchmark_max_throughput_latest_lsn(
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
def setup_pageserver_with_pgbench_tenants(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
n_tenants: int,
scale: int,
) -> NeonEnv:
"""
Utility function to set up a pageserver with a given number of identical tenants.
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
"""
def setup_template(env: NeonEnv):
# use a config that makes production of on-disk state timing-insensitive
# as we ingest data into the tenant.
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
return (template_tenant, template_timeline, config)
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
return many_tenants.single_timeline(neon_env_builder, setup_template, n_tenants)
env = neon_env_builder.build_and_use_snapshot(
f"max_throughput_latest_lsn-{n_tenants}-{scale}", doit
)
env.start()
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
return env

View File

@@ -2,9 +2,16 @@
Utilities used by all code in this sub-directory
"""
from typing import Any, Callable, Dict, Tuple
import fixtures.pageserver.many_tenants as many_tenants
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.pageserver.utils import wait_until_all_tenants_state
from fixtures.types import TenantId, TimelineId
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
@@ -27,3 +34,24 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
assert not layer.remote
log.info("ready")
def setup_pageserver_with_tenants(
neon_env_builder: NeonEnvBuilder,
name: str,
n_tenants: int,
setup: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]],
) -> NeonEnv:
"""
Utility function to set up a pageserver with a given number of identical tenants.
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
"""
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
return many_tenants.single_timeline(neon_env_builder, setup, n_tenants)
env = neon_env_builder.build_and_use_snapshot(name, doit)
env.start()
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
return env

View File

@@ -17,10 +17,10 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
tenant, _ = env.neon_cli.create_tenant(
conf={
"gc_period": "0s",
"checkpoint_distance": "8192",
"checkpoint_distance": "16384",
"compaction_period": "1 s",
"compaction_threshold": "1",
"compaction_target_size": "8192",
"compaction_target_size": "16384",
}
)

View File

@@ -226,6 +226,10 @@ def test_forward_compatibility(
)
try:
# TODO: remove this once the previous pageserrver version understands
# the 'get_vectored_impl' config
neon_env_builder.pageserver_get_vectored_impl = None
neon_env_builder.num_safekeepers = 3
neon_local_binpath = neon_env_builder.neon_binpath
env = neon_env_builder.from_repo_dir(

View File

@@ -17,6 +17,7 @@ from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
wait_until_tenant_active,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn
@@ -165,6 +166,10 @@ def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
####
# Produce layers
####
lsns = []
table_len = 10000
@@ -194,19 +199,29 @@ def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
# run checkpoint manually to be sure that data landed in remote storage
client.timeline_checkpoint(tenant_id, timeline_id)
##### Stop the first pageserver instance, erase all its data
# prevent new WAL from being produced, wait for layers to reach remote storage
env.endpoints.stop_all()
# Stop safekeepers and take another checkpoint. The endpoints might
# have written a few more bytes during shutdown.
for sk in env.safekeepers:
sk.stop()
client.timeline_checkpoint(tenant_id, timeline_id)
current_lsn = Lsn(client.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
# wait until pageserver has successfully uploaded all the data to remote storage
# NB: the wait_for_upload returns as soon as remote_consistent_lsn == current_lsn.
# But the checkpoint also triggers a compaction
# => image layer generation =>
# => doesn't advance LSN
# => but we want the remote state to deterministic, so additionally, wait for upload queue to drain
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
client.deletion_queue_flush(execute=True)
del current_lsn
env.pageserver.stop()
env.pageserver.start()
# We've shut down the SKs, then restarted the PSes to sever all walreceiver connections;
# This means pageserver's remote_consistent_lsn is now frozen to whatever it was after the pageserver.stop() call.
wait_until_tenant_active(client, tenant_id)
###
# Produce layers complete;
# Start the actual testing.
###
def get_api_current_physical_size():
d = client.timeline_detail(tenant_id, timeline_id)
@@ -223,9 +238,7 @@ def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
log.info(filled_size)
assert filled_current_physical == filled_size, "we don't yet do layer eviction"
# Wait until generated image layers are uploaded to S3
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
# Stop the first pageserver instance, erase all its data
env.pageserver.stop()
# remove all the layer files

View File

@@ -74,16 +74,19 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# We will make no effort to avoid stale attachments
for ps in env.pageservers:
ps.allowed_errors.extend(
[
# We will make no effort to avoid stale attachments
".*Dropped remote consistent LSN updates.*",
".*Dropping stale deletions.*",
# page_service_conn_main{peer_addr=[::1]:41176}: query handler for 'pagestream 3b19aec5038c796f64b430b30a555121 d07776761d44050b8aab511df1657d83' failed: Tenant 3b19aec5038c796f64b430b30a555121 not found
".*query handler.*Tenant.*not found.*",
# page_service_conn_main{peer_addr=[::1]:45552}: query handler for 'pagestream 414ede7ad50f775a8e7d9ba0e43b9efc a43884be16f44b3626482b6981b2c745' failed: Tenant 414ede7ad50f775a8e7d9ba0e43b9efc is not active
".*query handler.*Tenant.*not active.*",
# this shutdown case is logged at WARN severity by the time it bubbles up to logical size calculation code
# WARN ...: initial size calculation failed: downloading failed, possibly for shutdown
".*downloading failed, possibly for shutdown",
]
)

View File

@@ -554,3 +554,13 @@ def test_sql_over_http_pool_custom_types(static_proxy: NeonProxy):
"select array['foo'::foo, 'bar'::foo, 'baz'::foo] as data",
)
assert response["rows"][0]["data"] == ["foo", "bar", "baz"]
@pytest.mark.asyncio
async def test_sql_over_http2(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http with login password 'http' superuser")
resp = await static_proxy.http2_query(
"select 42 as answer", [], user="http", password="http", expected_code=200
)
assert resp["rows"] == [{"answer": 42}]

View File

@@ -274,15 +274,9 @@ def test_remote_storage_upload_queue_retries(
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
def get_queued_count(file_kind, op_kind):
val = client.get_remote_timeline_client_metric(
"pageserver_remote_timeline_client_calls_unfinished",
tenant_id,
timeline_id,
file_kind,
op_kind,
return client.get_remote_timeline_client_queue_count(
tenant_id, timeline_id, file_kind, op_kind
)
assert val is not None, "expecting metric to be present"
return int(val)
# create some layers & wait for uploads to finish
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a")
@@ -434,7 +428,7 @@ def test_remote_timeline_client_calls_started_metric(
assert timeline_id is not None
for (file_kind, op_kind), observations in calls_started.items():
val = client.get_metric_value(
name="pageserver_remote_timeline_client_calls_started_count",
name="pageserver_remote_timeline_client_calls_started_total",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),
@@ -537,16 +531,6 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
client = env.pageserver.http_client()
def get_queued_count(file_kind, op_kind):
val = client.get_remote_timeline_client_metric(
"pageserver_remote_timeline_client_calls_unfinished",
tenant_id,
timeline_id,
file_kind,
op_kind,
)
return int(val) if val is not None else val
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
client.configure_failpoints(("before-upload-layer", "return"))
@@ -580,7 +564,10 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
def assert_compacted_and_uploads_queued():
assert timeline_path.exists()
assert len(list(timeline_path.glob("*"))) >= 8
assert get_queued_count(file_kind="index", op_kind="upload") > 0
assert (
get_queued_count(client, tenant_id, timeline_id, file_kind="index", op_kind="upload")
> 0
)
wait_until(20, 0.1, assert_compacted_and_uploads_queued)
@@ -618,7 +605,10 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
assert len(filtered) == 0
# timeline deletion should kill ongoing uploads, so, the metric will be gone
assert get_queued_count(file_kind="index", op_kind="upload") is None
assert (
get_queued_count(client, tenant_id, timeline_id, file_kind="index", op_kind="upload")
is None
)
# timeline deletion should be unblocking checkpoint ops
checkpoint_thread.join(2.0)
@@ -919,16 +909,8 @@ def get_queued_count(
file_kind: str,
op_kind: str,
):
val = client.get_remote_timeline_client_metric(
"pageserver_remote_timeline_client_calls_unfinished",
tenant_id,
timeline_id,
file_kind,
op_kind,
)
if val is None:
return val
return int(val)
"""The most important aspect of this function is shorter name & no return type so asserts are more concise."""
return client.get_remote_timeline_client_queue_count(tenant_id, timeline_id, file_kind, op_kind)
def assert_nothing_to_upload(

View File

@@ -4,7 +4,7 @@ from fixtures.neon_fixtures import (
tenant_get_shards,
)
from fixtures.remote_storage import s3_storage
from fixtures.types import TenantShardId, TimelineId
from fixtures.types import Lsn, TenantShardId, TimelineId
from fixtures.workload import Workload
@@ -284,3 +284,84 @@ def test_sharding_split_smoke(
)
env.attachment_service.consistency_check()
def test_sharding_ingest(
neon_env_builder: NeonEnvBuilder,
):
"""
Check behaviors related to ingest:
- That we generate properly sized layers
- TODO: that updates to remote_consistent_lsn are made correctly via safekeepers
"""
# Set a small stripe size and checkpoint distance, so that we can exercise rolling logic
# without writing a lot of data.
expect_layer_size = 131072
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{expect_layer_size}",
"compaction_target_size": f"{expect_layer_size}",
}
shard_count = 4
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF,
initial_tenant_shard_count=shard_count,
# A stripe size the same order of magnitude as layer size: this ensures that
# within checkpoint_distance some shards will have no data to ingest, if LSN
# contains sequential page writes. This test checks that this kind of
# scenario doesn't result in some shards emitting empty/tiny layers.
initial_tenant_shard_stripe_size=expect_layer_size // 8192,
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.validate()
small_layer_count = 0
ok_layer_count = 0
huge_layer_count = 0
# Inspect the resulting layer map, count how many layers are undersized.
for shard in env.attachment_service.locate(tenant_id):
pageserver = env.get_pageserver(shard["node_id"])
shard_id = shard["shard_id"]
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
for layer in layer_map.historic_layers:
assert layer.layer_file_size is not None
if layer.layer_file_size < expect_layer_size // 2:
classification = "Small"
small_layer_count += 1
elif layer.layer_file_size > expect_layer_size * 2:
classification = "Huge "
huge_layer_count += 1
else:
classification = "OK "
ok_layer_count += 1
if layer.kind == "Delta":
assert layer.lsn_end is not None
lsn_size = Lsn(layer.lsn_end) - Lsn(layer.lsn_start)
else:
lsn_size = 0
log.info(
f"{classification} layer[{pageserver.id}]: {layer.layer_file_name} (size {layer.layer_file_size}, LSN distance {lsn_size})"
)
# Why an inexact check?
# - Because we roll layers on checkpoint_distance * shard_count, we expect to obey the target
# layer size on average, but it is still possible to write some tiny layers.
log.info(f"Totals: {small_layer_count} small layers, {ok_layer_count} ok layers")
assert float(small_layer_count) / float(ok_layer_count) < 0.25
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
assert huge_layer_count <= shard_count

View File

@@ -167,10 +167,14 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
# The VM page in shared buffer cache, and the same page as reconstructed
# by the pageserver, should be equal.
#
# Ignore the LSN on the page though (first 8 bytes). If the dirty
# VM page is flushed from the cache for some reason, it gets WAL-logged,
# which changes the LSN on the page.
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
vm_page_in_cache = (cur.fetchall()[0][0])[:100].hex()
vm_page_in_cache = (cur.fetchall()[0][0])[8:100].hex()
cur.execute("select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn() )")
vm_page_at_pageserver = (cur.fetchall()[0][0])[:100].hex()
vm_page_at_pageserver = (cur.fetchall()[0][0])[8:100].hex()
assert vm_page_at_pageserver == vm_page_in_cache
@@ -201,16 +205,6 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
for _ in range(1000):
cur.execute(
"select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn() )"
)
page = (cur.fetchall()[0][0])[:100].hex()
log.info(f"VM page contents: {page}")
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
page = (cur.fetchall()[0][0])[:100].hex()
log.info(f"VM page contents in cache: {page}")
cur.execute("select min(datfrozenxid::text::int) from pg_database")
datfrozenxid = int(cur.fetchall()[0][0])
log.info(f"datfrozenxid {datfrozenxid} locking_xid: {locking_xid}")

View File

@@ -74,6 +74,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
}
prev = Some(req);
}
PagestreamFeMessage::GetVectoredPages(_) => {}
PagestreamFeMessage::DbSize(_) => {}
};
}