Compare commits

..

26 Commits

Author SHA1 Message Date
Conrad Ludgate
0e551edb06 rename fn 2024-08-16 08:54:25 +01:00
Conrad Ludgate
484cdccbf2 fix cancellation 2024-08-16 08:44:03 +01:00
Conrad Ludgate
39d1b78817 fix http connect config 2024-08-16 08:42:14 +01:00
Joonas Koivunen
4763a960d1 chore: log if we have an open layer or any frozen on shutdown (#8740)
Some benchmarks are failing with a "long" flushing, which might be
because there is a queue of in-memory layers, or something else. Add
logging to narrow it down.

Private slack DM ref:
https://neondb.slack.com/archives/D049K7HJ9JM/p1723727305238099
2024-08-16 06:10:05 +01:00
Sasha Krassovsky
df086cd139 Add logical replication test to exercise snapfiles (#8364) 2024-08-15 15:34:45 -07:00
Alexander Bayandin
69cb1ee479 CI(replication-tests): store test results & change notification channel (#8687)
## Problem

We want to store Nightly Replication test results in the database and
notify the relevant Slack channel about failures

## Summary of changes
- Store test results in the database
- Notify `on-call-compute-staging-stream` about failures
2024-08-15 22:41:58 +01:00
Alexander Bayandin
4e58fd9321 CI(label-for-external-users): use CI_ACCESS_TOKEN (#8738)
## Problem

`secrets.GITHUB_TOKEN` (with any permissions) is not enough to get 
a user's membership info if they decide to hide it.

## Summary of changes
- Use `secrets.CI_ACCESS_TOKEN` for `gh api` call
- Use `pull_request_target` instead of `pull_request` event to get
access to secrets
2024-08-15 18:37:15 +01:00
Konstantin Knizhnik
f087423a01 Handle reload config file request in LR monitor (#8732)
## Problem

Logical replication BGW checking replication lag is not reloading config

## Summary of changes

Add handling of reload config request

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-08-15 16:28:25 +03:00
Joonas Koivunen
24d347f50b storcon: use tracing for logging panics (#8734)
this gives spans for panics, and does not globber loki output by writing
to stderr while all of the other logging is to stdout.

See: #3475
2024-08-15 16:27:07 +03:00
Joonas Koivunen
52641eb853 storcon: add spans to drain/fill ops (#8735)
this way we do not need to repeat the %node_id everywhere, and we get no
stray messages in logs from within the op.
2024-08-15 15:30:04 +03:00
Joonas Koivunen
d9a57aeed9 storcon: deny external node configuration if an operation is ongoing (#8727)
Per #8674, disallow node configuration while drain/fill are ongoing.
Implement it by adding a only-http wrapper
`Service::external_node_configure` which checks for operation existing
before configuring.

Additionally:
- allow cancelling drain/fill after a pageserver has restarted and
transitioned to WarmingUp

Fixes: #8674
2024-08-15 10:54:05 +01:00
Alexander Bayandin
a9c28be7d0 fix(pageserver): allow unused_imports in download.rs on macOS (#8733)
## Problem

On macOS, clippy fails with the following error:

```
error: unused import: `crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt`
  --> pageserver/src/tenant/remote_timeline_client/download.rs:26:5
   |
26 | use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: `-D unused-imports` implied by `-D warnings`
   = help: to override `-D warnings` add `#[allow(unused_imports)]`
```

Introduced in https://github.com/neondatabase/neon/pull/8717

## Summary of changes
- allow `unused_imports` for
`crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt` on macOS
in download.rs
2024-08-15 10:06:28 +01:00
Vlad Lazar
fef77b0cc9 safekeeper: consider partial uploads when pulling timeline (#8628)
## Problem
The control file contains the id of the safekeeper that uploaded it.
Previously, when sending a snapshot of the control file to another sk,
it would eventually be gc-ed by the receiving sk. This is incorrect
because the original sk might still need it later.

## Summary of Changes
When sending a snapshot and the control file contains an uploaded
segment:
* Create a copy of the segment in s3 with the destination sk in the
  object name
* Tweak the streamed control file to point to the object create in the
  previous step

Note that the snapshot endpoint now has to know the id of the requestor,
so the api has been extended to include the node if of the destination
sk.

Closes https://github.com/neondatabase/neon/issues/8542
2024-08-15 09:02:33 +01:00
Christian Schwarz
168913bdf0 refactor(write path): newtype to enforce use of fully initialized slices (#8717)
The `tokio_epoll_uring::Slice` / `tokio_uring::Slice` type is weird.
The new `FullSlice` newtype is better. See the doc comment for details.

The naming is not ideal, but we'll clean that up in a future refactoring
where we move the `FullSlice` into `tokio_epoll_uring`. Then, we'll do
the following:
* tokio_epoll_uring::Slice is removed
* `FullSlice` becomes `tokio_epoll_uring::IoBufView`
* new type `tokio_epoll_uring::IoBufMutView` for the current
`tokio_epoll_uring::Slice<IoBufMut>`

Context
-------

I did this work in preparation for
https://github.com/neondatabase/neon/pull/8537.
There, I'm changing the type that the `inmemory_layer.rs` passes to
`DeltaLayerWriter::put_value_bytes` and thus it seemed like a good
opportunity to make this cleanup first.
2024-08-14 21:57:17 +02:00
Alexander Bayandin
aa2e16f307 CI: misc cleanup & fixes (#8559)
## Problem
A bunch of small fixes and improvements for CI, that are too small to
have a separate PR for them

## Summary of changes
- CI(build-and-test): fix parenthesis
- CI(actionlint): fix path to workflow file
- CI: remove default args from actions/checkout
- CI: remove `gen3` label, using a couple `self-hosted` +
`small{,-arm64}`/`large{,-arm64}` is enough
- CI: prettify Slack messages, hide links behind text messages
- C(build-and-test): add more dependencies to `conclusion` job
2024-08-14 17:56:59 +01:00
Alexander Bayandin
70b18ff481 CI(neon-image): add ARM-specific RUSTFLAGS (#8566)
## Problem

It's recommended that a couple of additional RUSTFLAGS be set up to
improve the performance of Rust applications on AWS Graviton.

See
57dc813626/rust.md

Note: Apple Silicon is compatible with neoverse-n1:
```
$ clang --version
Apple clang version 15.0.0 (clang-1500.3.9.4)
Target: arm64-apple-darwin23.6.0
Thread model: posix
InstalledDir: /Applications/Xcode_15.4.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin
$
$ clang --print-supported-cpus 2>&1 | grep neoverse-
	neoverse-512tvb
	neoverse-e1
	neoverse-n1
	neoverse-n2
	neoverse-v1
	neoverse-v2
```

## Summary of changes
- Add `-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1` to RUSTFLAGS for
ARM images
2024-08-14 17:03:21 +01:00
Joonas Koivunen
60fc1e8cc8 chore: even more responsive compaction cancellation (#8725)
Some benchmarks and tests might still fail because of #8655 (tracked in
#8708) because we are not fast enough to shut down ([one evidence]).
Partially this is explained by the current validation mode of streaming
k-merge, but otherwise because that is where we use a lot of time in
compaction. Outside of L0 => L1 compaction, the image layer generation
is already guarded by vectored reads doing cancellation checks.

32768 is a wild guess based on looking how many keys we put in each
layer in a bench (1-2 million), but I assume it will be good enough
divisor. Doing checks more often will start showing up as contention
which we cannot currently measure. Doing checks less often might be
reasonable.

[one evidence]:
https://neon-github-public-dev.s3.amazonaws.com/reports/main/10384136483/index.html#suites/9681106e61a1222669b9d22ab136d07b/96e6d53af234924/

Earlier PR: #8706.
2024-08-14 14:48:15 +01:00
Alexander Bayandin
36c1719a07 CI(build-neon): fix accidental neon rebuild on cargo test (#8721)
## Problem

During `Run rust tests` step (for debug builds), we accidentally rebuild
neon twice (by `cargo test --doc` and by `cargo nextest run`).
It happens because we don't set `cov_prefix` for the `cargo test --doc`
command, which triggers rebuilding with different build flags, and one
more rebuild by `cargo nextest run`.

## Summary of changes
- Set `cov_prefix` for `cargo test --doc` to prevent unneeded rebuilds
2024-08-14 13:38:25 +01:00
John Spray
abb53ba36d storcon_cli: don't clobber heatmap interval when setting eviction (#8722)
## Problem

This command is kind of a hack, used when we're migrating large tenants
and want to get their resident size down. It sets the tenant config to a
fixed value, which omitted heatmap_period, so caused secondaries to get
out of date.

## Summary of changes

- Set heatmap period to the same 300s default that we use elsewhere when
updating eviction settings

This is not as elegant as some general purpose partial modification of
the config, but it practically makes the command safer to use.
2024-08-14 13:37:03 +01:00
Conrad Ludgate
a7028d92b7 proxy: start of jwk cache (#8690)
basic JWT implementation that caches JWKs and verifies signatures.

this code is currently not reachable from proxy, I just wanted to get
something merged in.
2024-08-14 13:35:29 +01:00
Joonas Koivunen
6c9e3c9551 refactor: error/anyhow::Error wrapping (#8697)
We can get CompactionError::Other(Cancelled) via the error handling with
a few ways.
[evidence](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8655/10301613380/index.html#suites/cae012a1e6acdd9fdd8b81541972b6ce/653a33de17802bb1/).
Hopefully fix it by:

1. replace the `map_err` which hid the
`GetReadyAncestorError::Cancelled` with `From<GetReadyAncestorError> for
GetVectoredError` conversion
2. simplifying the code in pgdatadir_mapping to eliminate the token
anyhow wrapping for deserialization errors
3. stop wrapping GetVectoredError as anyhow errors
4. stop wrapping PageReconstructError as anyhow errors

Additionally, produce warnings if we treat any other error (as was legal
before this PR) as missing key.

Cc: #8708.
2024-08-14 12:45:56 +01:00
Alexander Bayandin
fc3d372f3a CI(label-for-external-users): check membership using GitHub API (#8724)
## Problem

`author_association` doesn't properly work if a GitHub user decides not
to show affiliation with the org in their profile (i.e. if it's private)

## Summary of changes
- Call
`/orgs/ORG/members/USERNAME` API to check whether 
a PR/issue author is a member of the org
2024-08-14 12:27:52 +01:00
John Spray
19d69d515c pageserver: evict covered layers earlier (#8679)
## Problem

When pageservers do compaction, they frequently create image layers that
make earlier layers un-needed for reads, but then keep those earlier
layers around for 24 hours waiting for time-based eviction to expire
them.

Now that we track layer visibility, we can use it as an input to
eviction, and avoid the 24 hour "disk bump" that happens around
pageserver restarts.

## Summary of changes

- During time-based eviction, if a layer is marked Covered, use the
eviction period as the threshold: i.e. these layers get to remain
resident for at least one iteration of the eviction loop, but then get
evicted. With current settings this means they get evicted after 1h
instead of 24h.
- During disk usage eviction, prioritized evicting covered layers above
all other layers.


Caveats:
- Using the period as the threshold for time based eviction in this case
is a bit of a hack, but it avoids adding yet another configuration
property, and in any case the value of a new property would be somewhat
arbitrary: there's no "right" length of time to keep covered layers
around just in case.
- We had previously planned on removing time-based eviction: this change
would motivate us to keep it around, but we can still simplify the code
later to just do the eviction of covered layers, rather than applying a
TTL policy to all layers.
2024-08-14 12:10:15 +01:00
Joonas Koivunen
485d76ac62 timeline_detach_ancestor: adjust error handling (#8528)
With additional phases from #8430 the `detach_ancestor::Error` became
untenable. Split it up into phases, and introduce laundering for
remaining `anyhow::Error` to propagate them as most often
`Error::ShuttingDown`.

Additionally, complete FIXMEs.

Cc: #6994
2024-08-14 10:16:18 +01:00
John Spray
4049d2b7e1 scrubber: fix spurious "Missed some shards" errors (#8661)
## Problem

The storage scrubber was reporting warnings for lots of timelines like:
```
WARN Missed some shards at count ShardCount(0) tenant_id=25eb7a83d9a2f90ac0b765b6ca84cf4c
```

These were spurious: these tenants are fine. There was a bug in
accumulating the ShardIndex for each tenant, whereby multiple timelines
would lead us to add the same ShardIndex more than one.

Closes: #8646 

## Summary of changes

- Accumulate ShardIndex in a BTreeSet instead of a Vec
- Extend the test to reproduce the issue
2024-08-14 09:29:06 +01:00
Konstantin Knizhnik
7a1736ddcf Preserve HEAP_COMBOCID when restoring t_cid from WAL (#8503)
## Problem

See https://github.com/neondatabase/neon/issues/8499

## Summary of changes

Save HEAP_COMBOCID flag in WAL and do not clear it in redo handlers.

Related Postgres PRs:
https://github.com/neondatabase/postgres/pull/457
https://github.com/neondatabase/postgres/pull/458
https://github.com/neondatabase/postgres/pull/459


## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2024-08-14 08:13:20 +03:00
77 changed files with 2655 additions and 809 deletions

View File

@@ -1,7 +1,6 @@
self-hosted-runner:
labels:
- arm64
- gen3
- large
- large-arm64
- small

View File

@@ -83,7 +83,6 @@ runs:
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Cache poetry deps
uses: actions/cache@v4

View File

@@ -70,7 +70,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Set pg 14 revision for caching
id: pg_v14_rev
@@ -208,7 +207,7 @@ jobs:
export LD_LIBRARY_PATH
#nextest does not yet support running doctests
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
@@ -263,7 +262,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Pytest regression tests
uses: ./.github/actions/run-python-test-set

View File

@@ -44,7 +44,7 @@ jobs:
grep -ERl $PAT .github/workflows |\
while read -r f
do
l=$(grep -nE $PAT .github/workflows/release.yml | awk -F: '{print $1}' | head -1)
l=$(grep -nE $PAT $f | awk -F: '{print $1}' | head -1)
echo "::error file=$f,line=$l::Please use 'ubuntu-22.04' instead of 'ubuntu-latest'"
done
exit 1

View File

@@ -96,7 +96,7 @@ jobs:
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
@@ -146,6 +146,7 @@ jobs:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
@@ -154,7 +155,10 @@ jobs:
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
slack-message: |
Periodic perf testing: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
@@ -176,7 +180,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Download Neon artifact
uses: ./.github/actions/download
with:
@@ -215,15 +219,23 @@ jobs:
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic replication testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
channel-id: "C06T9AMNDQQ" # on-call-compute-staging-stream
slack-message: |
Periodic replication testing: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
@@ -325,7 +337,7 @@ jobs:
prepare_AWS_RDS_databases:
uses: ./.github/workflows/_benchmarking_preparation.yml
secrets: inherit
pgbench-compare:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
needs: [ generate-matrices, prepare_AWS_RDS_databases ]
@@ -365,7 +377,7 @@ jobs:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
@@ -460,6 +472,7 @@ jobs:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
@@ -468,7 +481,10 @@ jobs:
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
slack-message: |
Periodic perf testing on ${{ matrix.platform }}: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
@@ -542,7 +558,7 @@ jobs:
esac
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
- name: Configure AWS credentials # necessary on Azure runners to read/write from/to S3
uses: aws-actions/configure-aws-credentials@v4
with:
@@ -577,8 +593,9 @@ jobs:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
@@ -587,7 +604,10 @@ jobs:
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic perf testing ${PLATFORM}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
slack-message: |
Periodic perf testing on ${{ env.PLATFORM }}: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
@@ -670,6 +690,7 @@ jobs:
TEST_OLAP_SCALE: 10
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
@@ -678,7 +699,10 @@ jobs:
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic OLAP perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
slack-message: |
Periodic OLAP perf testing on ${{ matrix.platform }}: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
@@ -764,6 +788,7 @@ jobs:
TEST_OLAP_SCALE: ${{ matrix.scale }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
@@ -772,7 +797,10 @@ jobs:
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic TPC-H perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
slack-message: |
Periodic TPC-H perf testing on ${{ matrix.platform }}: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
@@ -843,6 +871,7 @@ jobs:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
@@ -851,6 +880,10 @@ jobs:
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic User example perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
slack-message: |
Periodic TPC-H perf testing on ${{ matrix.platform }}: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -38,7 +38,7 @@ jobs:
matrix:
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "gen3", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
env:
IMAGE_TAG: ${{ inputs.image-tag }}

View File

@@ -48,7 +48,7 @@ jobs:
tag:
needs: [ check-permissions ]
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
@@ -90,7 +90,7 @@ jobs:
check-codestyle-python:
needs: [ check-permissions, build-build-tools-image ]
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
@@ -101,9 +101,6 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: false
fetch-depth: 1
- name: Cache poetry deps
uses: actions/cache@v4
@@ -142,7 +139,6 @@ jobs:
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
# Disabled for now
# - name: Restore cargo deps cache
@@ -204,7 +200,7 @@ jobs:
matrix:
arch: [ x64 ]
# Do not build or run tests in debug for release branches
build-type: ${{ fromJson((startsWith(github.ref_name, 'release' && github.event_name == 'push')) && '["release"]' || '["debug", "release"]') }}
build-type: ${{ fromJson((startsWith(github.ref_name, 'release') && github.event_name == 'push') && '["release"]' || '["debug", "release"]') }}
include:
- build-type: release
arch: arm64
@@ -224,7 +220,7 @@ jobs:
outputs:
json: ${{ steps.get-benchmark-durations.outputs.json }}
needs: [ check-permissions, build-build-tools-image ]
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
@@ -257,7 +253,7 @@ jobs:
benchmarks:
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
needs: [ check-permissions, build-and-test-locally, build-build-tools-image, get-benchmarks-durations ]
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
@@ -302,9 +298,8 @@ jobs:
with:
channel-id: C060CNA47S9 # on-call-staging-storage-stream
slack-message: |
Benchmarks failed on main: ${{ github.event.head_commit.url }}
Allure report: ${{ needs.create-test-report.outputs.report-url }}
Benchmarks failed on main <${{ github.event.head_commit.url }}|${{ github.sha }}>
<${{ needs.create-test-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
@@ -314,7 +309,7 @@ jobs:
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
@@ -361,7 +356,7 @@ jobs:
coverage-report:
needs: [ check-permissions, build-build-tools-image, build-and-test-locally ]
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
@@ -475,7 +470,7 @@ jobs:
matrix:
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "gen3", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
steps:
- name: Checkout
@@ -503,7 +498,10 @@ jobs:
- uses: docker/build-push-action@v6
with:
context: .
# ARM-specific flags are recommended for Graviton ≥ 2, these flags are also supported by Ampere Altra (Azure)
# https://github.com/aws/aws-graviton-getting-started/blob/57dc813626d0266f1cc12ef83474745bb1f31fb4/rust.md
build-args: |
ADDITIONAL_RUSTFLAGS=${{ matrix.arch == 'arm64' && '-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1' || '' }}
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}
@@ -551,7 +549,7 @@ jobs:
version: [ v14, v15, v16 ]
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "gen3", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
steps:
- name: Checkout
@@ -696,7 +694,7 @@ jobs:
vm-compute-node-image:
needs: [ check-permissions, tag, compute-node-image ]
runs-on: [ self-hosted, gen3, large ]
runs-on: [ self-hosted, large ]
strategy:
fail-fast: false
matrix:
@@ -745,7 +743,7 @@ jobs:
matrix:
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "gen3", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
steps:
- name: Checkout
@@ -960,7 +958,7 @@ jobs:
needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
if: github.ref_name == 'main' || github.ref_name == 'release'|| github.ref_name == 'release-proxy'
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
steps:
- name: Fix git ownership
@@ -980,7 +978,6 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: false
fetch-depth: 0
- name: Trigger deploy workflow
@@ -1061,7 +1058,7 @@ jobs:
needs: [ check-permissions, promote-images, tag, build-and-test-locally ]
if: github.ref_name == 'release'
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
@@ -1117,10 +1114,12 @@ jobs:
# Format `needs` differently to make the list more readable.
# Usually we do `needs: [...]`
needs:
- build-and-test-locally
- check-codestyle-python
- check-codestyle-rust
- build-and-test-locally
- promote-images
- test-images
- trigger-custom-extensions-build-and-wait
runs-on: ubuntu-22.04
steps:
# The list of possible results:

View File

@@ -4,7 +4,7 @@ on:
issues:
types:
- opened
pull_request:
pull_request_target:
types:
- opened
@@ -15,21 +15,40 @@ env:
LABEL: external
jobs:
check-user:
runs-on: ubuntu-22.04
outputs:
is-member: ${{ steps.check-user.outputs.is-member }}
steps:
- name: Check whether `${{ github.actor }}` is a member of `${{ github.repository_owner }}`
id: check-user
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then
is_member=true
else
is_member=false
fi
echo "is-member=${is_member}" | tee -a ${GITHUB_OUTPUT}
add-label:
# This workflow uses `author_association` for PRs and issues to determine if the user is an external user.
# Possible values for `author_association`: https://docs.github.com/en/graphql/reference/enums#commentauthorassociation
if: ${{ !contains(fromJSON('["OWNER", "MEMBER", "COLLABORATOR"]'), github.event[github.event_name == 'pull_request' && 'pull_request' || 'issue'].author_association) }}
if: needs.check-user.outputs.is-member == 'false'
needs: [ check-user ]
runs-on: ubuntu-22.04
permissions:
pull-requests: write
issues: write
pull-requests: write # for `gh pr edit`
issues: write # for `gh issue edit`
steps:
- name: Label new ${{ github.event_name }}
- name: Add `${{ env.LABEL }}` label
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request' && 'pull_request' || 'issue'].number }}
GH_CLI_COMMAND: ${{ github.event_name == 'pull_request' && 'pr' || 'issue' }}
ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request_target' && 'pull_request' || 'issue'].number }}
GH_CLI_COMMAND: ${{ github.event_name == 'pull_request_target' && 'pr' || 'issue' }}
run: |
gh ${GH_CLI_COMMAND} --repo ${GITHUB_REPOSITORY} edit --add-label=${LABEL} ${ITEM_NUMBER}

View File

@@ -56,7 +56,6 @@ jobs:
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Install macOS postgres dependencies
run: brew install flex bison openssl protobuf icu4c pkg-config
@@ -158,7 +157,6 @@ jobs:
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
# Some of our rust modules use FFI and need those to be checked
- name: Get postgres headers

View File

@@ -27,7 +27,7 @@ concurrency:
jobs:
trigger_bench_on_ec2_machine_in_eu_central_1:
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, small ]
container:
image: neondatabase/build-tools:pinned
credentials:

273
Cargo.lock generated
View File

@@ -484,7 +484,7 @@ dependencies = [
"http 0.2.9",
"http 1.1.0",
"once_cell",
"p256",
"p256 0.11.1",
"percent-encoding",
"ring 0.17.6",
"sha2",
@@ -848,6 +848,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"
[[package]]
name = "base16ct"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf"
[[package]]
name = "base64"
version = "0.13.1"
@@ -971,9 +977,9 @@ checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "bytemuck"
version = "1.16.0"
version = "1.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5"
checksum = "102087e286b4677862ea56cf8fc58bb2cdfa8725c40ffb80fe3a008eb7f2fc83"
[[package]]
name = "byteorder"
@@ -1526,8 +1532,10 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76"
dependencies = [
"generic-array",
"rand_core 0.6.4",
"subtle",
"zeroize",
]
[[package]]
@@ -1621,6 +1629,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c"
dependencies = [
"const-oid",
"pem-rfc7468",
"zeroize",
]
@@ -1720,6 +1729,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"const-oid",
"crypto-common",
"subtle",
]
@@ -1771,11 +1781,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c"
dependencies = [
"der 0.6.1",
"elliptic-curve",
"rfc6979",
"elliptic-curve 0.12.3",
"rfc6979 0.3.1",
"signature 1.6.4",
]
[[package]]
name = "ecdsa"
version = "0.16.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca"
dependencies = [
"der 0.7.8",
"digest",
"elliptic-curve 0.13.8",
"rfc6979 0.4.0",
"signature 2.2.0",
"spki 0.7.3",
]
[[package]]
name = "either"
version = "1.8.1"
@@ -1788,16 +1812,36 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3"
dependencies = [
"base16ct",
"base16ct 0.1.1",
"crypto-bigint 0.4.9",
"der 0.6.1",
"digest",
"ff",
"ff 0.12.1",
"generic-array",
"group",
"pkcs8",
"group 0.12.1",
"pkcs8 0.9.0",
"rand_core 0.6.4",
"sec1",
"sec1 0.3.0",
"subtle",
"zeroize",
]
[[package]]
name = "elliptic-curve"
version = "0.13.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47"
dependencies = [
"base16ct 0.2.0",
"crypto-bigint 0.5.5",
"digest",
"ff 0.13.0",
"generic-array",
"group 0.13.0",
"pem-rfc7468",
"pkcs8 0.10.2",
"rand_core 0.6.4",
"sec1 0.7.3",
"subtle",
"zeroize",
]
@@ -1951,6 +1995,16 @@ dependencies = [
"subtle",
]
[[package]]
name = "ff"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449"
dependencies = [
"rand_core 0.6.4",
"subtle",
]
[[package]]
name = "filetime"
version = "0.2.22"
@@ -2148,6 +2202,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
"zeroize",
]
[[package]]
@@ -2214,7 +2269,18 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7"
dependencies = [
"ff",
"ff 0.12.1",
"rand_core 0.6.4",
"subtle",
]
[[package]]
name = "group"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63"
dependencies = [
"ff 0.13.0",
"rand_core 0.6.4",
"subtle",
]
@@ -2776,6 +2842,42 @@ dependencies = [
"libc",
]
[[package]]
name = "jose-b64"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bec69375368709666b21c76965ce67549f2d2db7605f1f8707d17c9656801b56"
dependencies = [
"base64ct",
"serde",
"subtle",
"zeroize",
]
[[package]]
name = "jose-jwa"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ab78e053fe886a351d67cf0d194c000f9d0dcb92906eb34d853d7e758a4b3a7"
dependencies = [
"serde",
]
[[package]]
name = "jose-jwk"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280fa263807fe0782ecb6f2baadc28dffc04e00558a58e33bfdb801d11fd58e7"
dependencies = [
"jose-b64",
"jose-jwa",
"p256 0.13.2",
"p384",
"rsa",
"serde",
"zeroize",
]
[[package]]
name = "js-sys"
version = "0.3.69"
@@ -2835,6 +2937,9 @@ name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
"spin 0.5.2",
]
[[package]]
name = "lazycell"
@@ -3204,6 +3309,23 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151"
dependencies = [
"byteorder",
"lazy_static",
"libm",
"num-integer",
"num-iter",
"num-traits",
"rand 0.8.5",
"smallvec",
"zeroize",
]
[[package]]
name = "num-complex"
version = "0.4.4"
@@ -3481,11 +3603,33 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594"
dependencies = [
"ecdsa",
"elliptic-curve",
"ecdsa 0.14.8",
"elliptic-curve 0.12.3",
"sha2",
]
[[package]]
name = "p256"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b"
dependencies = [
"ecdsa 0.16.9",
"elliptic-curve 0.13.8",
"primeorder",
"sha2",
]
[[package]]
name = "p384"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70786f51bcc69f6a4c0360e063a4cac5419ef7c5cd5b3c99ad70f3be5ba79209"
dependencies = [
"elliptic-curve 0.13.8",
"primeorder",
]
[[package]]
name = "pagebench"
version = "0.1.0"
@@ -3847,6 +3991,15 @@ dependencies = [
"serde",
]
[[package]]
name = "pem-rfc7468"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
dependencies = [
"base64ct",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
@@ -3913,6 +4066,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs1"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f"
dependencies = [
"der 0.7.8",
"pkcs8 0.10.2",
"spki 0.7.3",
]
[[package]]
name = "pkcs8"
version = "0.9.0"
@@ -3923,6 +4087,16 @@ dependencies = [
"spki 0.6.0",
]
[[package]]
name = "pkcs8"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
dependencies = [
"der 0.7.8",
"spki 0.7.3",
]
[[package]]
name = "pkg-config"
version = "0.3.27"
@@ -4116,6 +4290,15 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "primeorder"
version = "0.13.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6"
dependencies = [
"elliptic-curve 0.13.8",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.20+deprecated"
@@ -4233,6 +4416,7 @@ version = "0.1.0"
dependencies = [
"ahash",
"anyhow",
"arc-swap",
"async-compression",
"async-trait",
"atomic-take",
@@ -4250,6 +4434,7 @@ dependencies = [
"consumption_metrics",
"crossbeam-deque",
"dashmap",
"ecdsa 0.16.9",
"env_logger",
"fallible-iterator",
"framed-websockets",
@@ -4270,12 +4455,15 @@ dependencies = [
"indexmap 2.0.1",
"ipnet",
"itertools 0.10.5",
"jose-jwa",
"jose-jwk",
"lasso",
"md5",
"measured",
"metrics",
"once_cell",
"opentelemetry",
"p256 0.13.2",
"parking_lot 0.12.1",
"parquet",
"parquet_derive",
@@ -4296,6 +4484,7 @@ dependencies = [
"reqwest-retry",
"reqwest-tracing",
"routerify",
"rsa",
"rstest",
"rustc-hash",
"rustls 0.22.4",
@@ -4305,6 +4494,7 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"signature 2.2.0",
"smallvec",
"smol_str",
"socket2 0.5.5",
@@ -4807,6 +4997,16 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rfc6979"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2"
dependencies = [
"hmac",
"subtle",
]
[[package]]
name = "ring"
version = "0.16.20"
@@ -4867,6 +5067,26 @@ dependencies = [
"archery",
]
[[package]]
name = "rsa"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc"
dependencies = [
"const-oid",
"digest",
"num-bigint-dig",
"num-integer",
"num-traits",
"pkcs1",
"pkcs8 0.10.2",
"rand_core 0.6.4",
"signature 2.2.0",
"spki 0.7.3",
"subtle",
"zeroize",
]
[[package]]
name = "rstest"
version = "0.18.2"
@@ -5195,10 +5415,24 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928"
dependencies = [
"base16ct",
"base16ct 0.1.1",
"der 0.6.1",
"generic-array",
"pkcs8",
"pkcs8 0.9.0",
"subtle",
"zeroize",
]
[[package]]
name = "sec1"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc"
dependencies = [
"base16ct 0.2.0",
"der 0.7.8",
"generic-array",
"pkcs8 0.10.2",
"subtle",
"zeroize",
]
@@ -5545,6 +5779,7 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
dependencies = [
"digest",
"rand_core 0.6.4",
]
@@ -7379,13 +7614,17 @@ dependencies = [
"clap",
"clap_builder",
"crossbeam-utils",
"crypto-bigint 0.5.5",
"der 0.7.8",
"deranged",
"digest",
"either",
"fail",
"futures-channel",
"futures-executor",
"futures-io",
"futures-util",
"generic-array",
"getrandom 0.2.11",
"hashbrown 0.14.5",
"hex",
@@ -7393,6 +7632,7 @@ dependencies = [
"hyper 0.14.26",
"indexmap 1.9.3",
"itertools 0.10.5",
"lazy_static",
"libc",
"log",
"memchr",
@@ -7416,7 +7656,9 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"signature 2.2.0",
"smallvec",
"spki 0.7.3",
"subtle",
"syn 1.0.109",
"syn 2.0.52",
@@ -7527,6 +7769,7 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
dependencies = [
"serde",
"zeroize_derive",
]

View File

@@ -35,8 +35,9 @@ COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_i
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& PQ_LIB_DIR=$(pwd)/pg_install/v16/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
&& PQ_LIB_DIR=$(pwd)/pg_install/v16/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -622,6 +622,7 @@ async fn main() -> anyhow::Result<()> {
threshold: threshold.into(),
},
)),
heatmap_period: Some("300s".to_string()),
..Default::default()
},
})

View File

@@ -22,7 +22,10 @@ feature-depth = 1
[advisories]
db-urls = ["https://github.com/rustsec/advisory-db"]
yanked = "warn"
ignore = []
[[advisories.ignore]]
id = "RUSTSEC-2023-0071"
reason = "the marvin attack only affects private key decryption, not public key signature verification"
# This section is considered when running `cargo deny check licenses`
# More documentation for the licenses section can be found here:

View File

@@ -313,20 +313,17 @@ pub struct MetadataHealthUpdateRequest {
pub struct MetadataHealthUpdateResponse {}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListUnhealthyResponse {
pub unhealthy_tenant_shards: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListOutdatedRequest {
#[serde(with = "humantime_serde")]
pub not_scrubbed_for: Duration,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListOutdatedResponse {
pub health_records: Vec<MetadataHealthRecord>,
}

View File

@@ -143,8 +143,8 @@ pub use v14::xlog_utils::XLogFileName;
pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
dispatch_pgversion!(version, Ok(pgv::bindings::bkpimg_is_compressed(bimg_info)))
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool {
dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info))
}
pub fn generate_wal_segment(

View File

@@ -42,6 +42,10 @@ impl DownloadError {
Timeout | Other(_) => false,
}
}
pub fn is_cancelled(&self) -> bool {
matches!(self, DownloadError::Cancelled)
}
}
impl From<std::io::Error> for DownloadError {

View File

@@ -64,7 +64,7 @@ use crate::{
mgr::TenantManager,
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName},
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName, LayerVisibilityHint},
},
CancellableTask, DiskUsageEvictionTask,
};
@@ -114,7 +114,7 @@ fn default_highest_layer_count_loses_first() -> bool {
}
impl EvictionOrder {
fn sort(&self, candidates: &mut [(MinResidentSizePartition, EvictionCandidate)]) {
fn sort(&self, candidates: &mut [(EvictionPartition, EvictionCandidate)]) {
use EvictionOrder::*;
match self {
@@ -644,6 +644,7 @@ pub(crate) struct EvictionCandidate {
pub(crate) layer: EvictionLayer,
pub(crate) last_activity_ts: SystemTime,
pub(crate) relative_last_activity: finite_f32::FiniteF32,
pub(crate) visibility: LayerVisibilityHint,
}
impl std::fmt::Display for EvictionLayer {
@@ -685,14 +686,22 @@ impl std::fmt::Debug for EvictionCandidate {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum MinResidentSizePartition {
enum EvictionPartition {
// A layer that is un-wanted by the tenant: evict all these first, before considering
// any other layers
EvictNow,
// Above the minimum size threshold: this layer is a candidate for eviction.
Above,
// Below the minimum size threshold: this layer should only be evicted if all the
// tenants' layers above the minimum size threshold have already been considered.
Below,
}
enum EvictionCandidates {
Cancelled,
Finished(Vec<(MinResidentSizePartition, EvictionCandidate)>),
Finished(Vec<(EvictionPartition, EvictionCandidate)>),
}
/// Gather the eviction candidates.
@@ -890,8 +899,10 @@ async fn collect_eviction_candidates(
max_layer_size
};
// Sort layers most-recently-used first, then partition by
// cumsum above/below min_resident_size.
// Sort layers most-recently-used first, then calculate [`EvictionPartition`] for each layer,
// where the inputs are:
// - whether the layer is visible
// - whether the layer is above/below the min_resident_size cutline
tenant_candidates
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
let mut cumsum: i128 = 0;
@@ -908,12 +919,23 @@ async fn collect_eviction_candidates(
candidate.relative_last_activity =
eviction_order.relative_last_activity(total, i);
let partition = if cumsum > min_resident_size as i128 {
MinResidentSizePartition::Above
} else {
MinResidentSizePartition::Below
let partition = match candidate.visibility {
LayerVisibilityHint::Covered => {
// Covered layers are evicted first
EvictionPartition::EvictNow
}
LayerVisibilityHint::Visible => {
cumsum += i128::from(candidate.layer.get_file_size());
if cumsum > min_resident_size as i128 {
EvictionPartition::Above
} else {
// The most recent layers below the min_resident_size threshold
// are the last to be evicted.
EvictionPartition::Below
}
}
};
cumsum += i128::from(candidate.layer.get_file_size());
(partition, candidate)
});
@@ -981,7 +1003,7 @@ async fn collect_eviction_candidates(
// Secondary locations' layers are always considered above the min resident size,
// i.e. secondary locations are permitted to be trimmed to zero layers if all
// the layers have sufficiently old access times.
MinResidentSizePartition::Above,
EvictionPartition::Above,
candidate,
)
});
@@ -1009,7 +1031,9 @@ async fn collect_eviction_candidates(
}
}
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
debug_assert!(EvictionPartition::Above < EvictionPartition::Below,
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
debug_assert!(EvictionPartition::EvictNow < EvictionPartition::Above,
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
eviction_order.sort(&mut candidates);
@@ -1022,7 +1046,7 @@ async fn collect_eviction_candidates(
///
/// Returns the amount of candidates selected, with the planned usage.
fn select_victims<U: Usage>(
candidates: &[(MinResidentSizePartition, EvictionCandidate)],
candidates: &[(EvictionPartition, EvictionCandidate)],
usage_pre: U,
) -> VictimSelection<U> {
let mut usage_when_switched = None;
@@ -1034,7 +1058,7 @@ fn select_victims<U: Usage>(
break;
}
if partition == &MinResidentSizePartition::Below && usage_when_switched.is_none() {
if partition == &EvictionPartition::Below && usage_when_switched.is_none() {
usage_when_switched = Some((usage_planned, i));
}

View File

@@ -178,10 +178,8 @@ fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Res
impl From<PageReconstructError> for ApiError {
fn from(pre: PageReconstructError) -> ApiError {
match pre {
PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
PageReconstructError::MissingKey(e) => {
ApiError::InternalServerError(anyhow::anyhow!("{e}"))
}
PageReconstructError::Other(other) => ApiError::InternalServerError(other),
PageReconstructError::MissingKey(e) => ApiError::InternalServerError(e.into()),
PageReconstructError::Cancelled => ApiError::Cancelled,
PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
@@ -1787,9 +1785,11 @@ async fn timeline_checkpoint_handler(
}
if wait_until_uploaded {
tracing::info!("Waiting for uploads to complete...");
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
tracing::info!("Uploads completed up to {}", timeline.get_remote_consistent_lsn_projected().unwrap_or(Lsn(0)));
}
json_response(StatusCode::OK, ())
@@ -1898,8 +1898,7 @@ async fn timeline_detach_ancestor_handler(
attempt,
ctx,
)
.await
.map_err(ApiError::InternalServerError)?;
.await?;
AncestorDetached {
reparented_timelines,

View File

@@ -287,10 +287,7 @@ impl Timeline {
// then check if the database was already initialized.
// get_rel_exists can be called before dbdir is created.
let buf = version.get(self, DBDIR_KEY, ctx).await?;
let dbdirs = match DbDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => Ok(dir.dbdirs),
Err(e) => Err(PageReconstructError::from(e)),
}?;
let dbdirs = DbDirectory::des(&buf)?.dbdirs;
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
return Ok(false);
}
@@ -298,13 +295,8 @@ impl Timeline {
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = version.get(self, key, ctx).await?;
match RelDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
let exists = dir.rels.contains(&(tag.relnode, tag.forknum));
Ok(exists)
}
Err(e) => Err(PageReconstructError::from(e)),
}
let dir = RelDirectory::des(&buf)?;
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
}
/// Get a list of all existing relations in given tablespace and database.
@@ -323,20 +315,16 @@ impl Timeline {
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
match RelDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
let rels: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
relnode: *relnode,
forknum: *forknum,
}));
let dir = RelDirectory::des(&buf)?;
let rels: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
relnode: *relnode,
forknum: *forknum,
}));
Ok(rels)
}
Err(e) => Err(PageReconstructError::from(e)),
}
Ok(rels)
}
/// Get the whole SLRU segment
@@ -398,13 +386,8 @@ impl Timeline {
let key = slru_dir_to_key(kind);
let buf = version.get(self, key, ctx).await?;
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
let exists = dir.segments.contains(&segno);
Ok(exists)
}
Err(e) => Err(PageReconstructError::from(e)),
}
let dir = SlruSegmentDirectory::des(&buf)?;
Ok(dir.segments.contains(&segno))
}
/// Locate LSN, such that all transactions that committed before
@@ -620,10 +603,7 @@ impl Timeline {
let key = slru_dir_to_key(kind);
let buf = version.get(self, key, ctx).await?;
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => Ok(dir.segments),
Err(e) => Err(PageReconstructError::from(e)),
}
Ok(SlruSegmentDirectory::des(&buf)?.segments)
}
pub(crate) async fn get_relmap_file(
@@ -647,10 +627,7 @@ impl Timeline {
// fetch directory entry
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
match DbDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => Ok(dir.dbdirs),
Err(e) => Err(PageReconstructError::from(e)),
}
Ok(DbDirectory::des(&buf)?.dbdirs)
}
pub(crate) async fn get_twophase_file(
@@ -672,10 +649,7 @@ impl Timeline {
// fetch directory entry
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => Ok(dir.xids),
Err(e) => Err(PageReconstructError::from(e)),
}
Ok(TwoPhaseDirectory::des(&buf)?.xids)
}
pub(crate) async fn get_control_file(
@@ -700,10 +674,7 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get(AUX_FILES_KEY, lsn, ctx).await {
Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => Ok(dir.files),
Err(e) => Err(PageReconstructError::from(e)),
},
Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
@@ -719,13 +690,14 @@ impl Timeline {
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let kv = self
.scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
.await
.context("scan")?;
.await?;
let mut result = HashMap::new();
let mut sz = 0;
for (_, v) in kv {
let v = v.context("get value")?;
let v = aux_file::decode_file_value_bytes(&v).context("value decode")?;
let v = v?;
let v = aux_file::decode_file_value_bytes(&v)
.context("value decode")
.map_err(PageReconstructError::Other)?;
for (fname, content) in v {
sz += fname.len();
sz += content.len();
@@ -793,11 +765,10 @@ impl Timeline {
) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
let kv = self
.scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
.await
.context("scan")?;
.await?;
let mut result = HashMap::new();
for (k, v) in kv {
let v = v.context("get value")?;
let v = v?;
let origin_id = k.field6 as RepOriginId;
let origin_lsn = Lsn::des(&v).unwrap();
if origin_lsn != Lsn::INVALID {
@@ -1733,12 +1704,17 @@ impl<'a> DatadirModification<'a> {
// the original code assumes all other errors are missing keys. Therefore, we keep the code path
// the same for now, though in theory, we should only match the `MissingKey` variant.
Err(
PageReconstructError::Other(_)
e @ (PageReconstructError::Other(_)
| PageReconstructError::WalRedo(_)
| PageReconstructError::MissingKey { .. },
| PageReconstructError::MissingKey(_)),
) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
if !matches!(e, PageReconstructError::MissingKey(_)) {
let e = utils::error::report_compact_sources(&e);
tracing::warn!("treating error as if it was a missing key: {}", e);
}
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
@@ -1893,7 +1869,7 @@ impl<'a> DatadirModification<'a> {
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::from(anyhow::anyhow!(
Err(PageReconstructError::Other(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};

View File

@@ -4491,10 +4491,13 @@ mod tests {
// This needs to traverse to the parent, and fails.
let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
assert!(err.to_string().starts_with(&format!(
"Bad state on timeline {}: Broken",
tline.timeline_id
)));
assert!(
err.to_string().starts_with(&format!(
"bad state on timeline {}: Broken",
tline.timeline_id
)),
"{err}"
);
Ok(())
}

View File

@@ -24,6 +24,7 @@ use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::io::{Error, ErrorKind};
@@ -186,11 +187,11 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// You need to make sure that the internal buffer is empty, otherwise
/// data will be written in wrong order.
#[inline(always)]
async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all_unbuffered<Buf: IoBuf + Send>(
&mut self,
src_buf: B,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (B::Buf, Result<(), Error>) {
) -> (FullSlice<Buf>, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
@@ -204,8 +205,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf);
let (mut buf, res) = self.inner.write_all(buf, ctx).await;
let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
res?;
let mut buf = slice.into_raw_slice().into_inner();
buf.clear();
self.buf = buf;
Ok(())
@@ -222,19 +224,30 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}
/// Internal, possibly buffered, write function
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all<Buf: IoBuf + Send>(
&mut self,
src_buf: B,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (B::Buf, Result<(), Error>) {
) -> (FullSlice<Buf>, Result<(), Error>) {
let src_buf = src_buf.into_raw_slice();
let src_buf_bounds = src_buf.bounds();
let restore = move |src_buf_slice: Slice<_>| {
FullSlice::must_new(Slice::from_buf_bounds(
src_buf_slice.into_inner(),
src_buf_bounds,
))
};
if !BUFFERED {
assert!(self.buf.is_empty());
return self.write_all_unbuffered(src_buf, ctx).await;
return self
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
.await;
}
let remaining = Self::CAPACITY - self.buf.len();
let src_buf_len = src_buf.bytes_init();
if src_buf_len == 0 {
return (Slice::into_inner(src_buf.slice_full()), Ok(()));
return (restore(src_buf), Ok(()));
}
let mut src_buf = src_buf.slice(0..src_buf_len);
// First try to copy as much as we can into the buffer
@@ -245,7 +258,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
// Then, if the buffer is full, flush it out
if self.buf.len() == Self::CAPACITY {
if let Err(e) = self.flush_buffer(ctx).await {
return (Slice::into_inner(src_buf), Err(e));
return (restore(src_buf), Err(e));
}
}
// Finally, write the tail of src_buf:
@@ -258,27 +271,29 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let copied = self.write_into_buffer(&src_buf);
// We just verified above that src_buf fits into our internal buffer.
assert_eq!(copied, src_buf.len());
Slice::into_inner(src_buf)
restore(src_buf)
} else {
let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
let (src_buf, res) = self
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
.await;
if let Err(e) = res {
return (src_buf, Err(e));
}
src_buf
}
} else {
Slice::into_inner(src_buf)
restore(src_buf)
};
(src_buf, Ok(()))
}
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
pub async fn write_blob<Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
srcbuf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
) -> (FullSlice<Buf>, Result<u64, Error>) {
let (buf, res) = self
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
.await;
@@ -287,43 +302,40 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
srcbuf: FullSlice<Buf>,
ctx: &RequestContext,
algorithm: ImageCompressionAlgorithm,
) -> (B::Buf, Result<(u64, CompressionInfo), Error>) {
) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
let offset = self.offset;
let mut compression_info = CompressionInfo {
written_compressed: false,
compressed_size: None,
};
let len = srcbuf.bytes_init();
let len = srcbuf.len();
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let mut compressed_buf = None;
let ((io_buf, hdr_res), srcbuf) = async {
let ((io_buf_slice, hdr_res), srcbuf) = async {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
(
self.write_all(io_buf, ctx).await,
srcbuf.slice_full().into_inner(),
)
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
} else {
// Write a 4-byte length header
if len > MAX_SUPPORTED_LEN {
return (
(
io_buf,
io_buf.slice_len(),
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
),
srcbuf.slice_full().into_inner(),
srcbuf,
);
}
let (high_bit_mask, len_written, srcbuf) = match algorithm {
@@ -336,8 +348,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
} else {
async_compression::tokio::write::ZstdEncoder::new(Vec::new())
};
let slice = srcbuf.slice_full();
encoder.write_all(&slice[..]).await.unwrap();
encoder.write_all(&srcbuf[..]).await.unwrap();
encoder.shutdown().await.unwrap();
let compressed = encoder.into_inner();
compression_info.compressed_size = Some(compressed.len());
@@ -345,31 +356,29 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
compression_info.written_compressed = true;
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_ZSTD, compressed_len, slice.into_inner())
(BYTE_ZSTD, compressed_len, srcbuf)
} else {
(BYTE_UNCOMPRESSED, len, slice.into_inner())
(BYTE_UNCOMPRESSED, len, srcbuf)
}
}
ImageCompressionAlgorithm::Disabled => {
(BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner())
}
ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
};
let mut len_buf = (len_written as u32).to_be_bytes();
assert_eq!(len_buf[0] & 0xf0, 0);
len_buf[0] |= high_bit_mask;
io_buf.extend_from_slice(&len_buf[..]);
(self.write_all(io_buf, ctx).await, srcbuf)
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
}
}
.await;
self.io_buf = Some(io_buf);
self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
match hdr_res {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
Err(e) => return (srcbuf, Err(e)),
}
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
(Slice::into_inner(srcbuf.slice(..)), res)
let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
(srcbuf, res)
} else {
self.write_all(srcbuf, ctx).await
};
@@ -432,21 +441,21 @@ pub(crate) mod tests {
let (_, res) = if compression {
let res = wtr
.write_blob_maybe_compressed(
blob.clone(),
blob.clone().slice_len(),
ctx,
ImageCompressionAlgorithm::Zstd { level: Some(1) },
)
.await;
(res.0, res.1.map(|(off, _)| off))
} else {
wtr.write_blob(blob.clone(), ctx).await
wtr.write_blob(blob.clone().slice_len(), ctx).await
};
let offs = res?;
offsets.push(offs);
}
// Write out one page worth of zeros so that we can
// read again with read_blk
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await;
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
wtr.flush_buffer(ctx).await?;

View File

@@ -4,6 +4,7 @@
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::BlockLease;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
use crate::virtual_file::VirtualFile;
use once_cell::sync::Lazy;
@@ -208,21 +209,11 @@ impl PreWarmingWriter {
}
impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
async fn write_all<
B: tokio_epoll_uring::BoundedBuf<Buf = Buf>,
Buf: tokio_epoll_uring::IoBuf + Send,
>(
async fn write_all<Buf: tokio_epoll_uring::IoBuf + Send>(
&mut self,
buf: B,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let buf = buf.slice(..);
let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done
let check_bounds_stuff_works = if cfg!(test) && cfg!(debug_assertions) {
Some(buf.to_vec())
} else {
None
};
) -> std::io::Result<(usize, FullSlice<Buf>)> {
let buflen = buf.len();
assert_eq!(
buflen % PAGE_SZ,
@@ -231,10 +222,10 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
);
// Do the IO.
let iobuf = match self.file.write_all(buf, ctx).await {
(iobuf, Ok(nwritten)) => {
let buf = match self.file.write_all(buf, ctx).await {
(buf, Ok(nwritten)) => {
assert_eq!(nwritten, buflen);
iobuf
buf
}
(_, Err(e)) => {
return Err(std::io::Error::new(
@@ -248,12 +239,6 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
}
};
// Reconstruct the Slice (the write path consumed the Slice and returned us the underlying IoBuf)
let buf = tokio_epoll_uring::Slice::from_buf_bounds(iobuf, saved_bounds);
if let Some(check_bounds_stuff_works) = check_bounds_stuff_works {
assert_eq!(&check_bounds_stuff_works, &*buf);
}
let nblocks = buflen / PAGE_SZ;
let nblocks32 = u32::try_from(nblocks).unwrap();
@@ -300,6 +285,6 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
}
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
Ok((buflen, buf.into_inner()))
Ok((buflen, buf))
}
}

View File

@@ -5,6 +5,8 @@
use std::mem::MaybeUninit;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
/// See module-level comment.
pub struct Buffer<const N: usize> {
allocation: Box<[u8; N]>,
@@ -60,10 +62,10 @@ impl<const N: usize> crate::virtual_file::owned_buffers_io::write::Buffer for Bu
self.written
}
fn flush(self) -> tokio_epoll_uring::Slice<Self> {
fn flush(self) -> FullSlice<Self> {
self.invariants();
let written = self.written;
tokio_epoll_uring::BoundedBuf::slice(self, 0..written)
FullSlice::must_new(tokio_epoll_uring::BoundedBuf::slice(self, 0..written))
}
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {

View File

@@ -1929,61 +1929,51 @@ impl TenantManager {
prepared: PreparedTimelineDetach,
mut attempt: detach_ancestor::Attempt,
ctx: &RequestContext,
) -> Result<HashSet<TimelineId>, anyhow::Error> {
use crate::tenant::timeline::detach_ancestor::Error;
// FIXME: this is unnecessary, slotguard already has these semantics
struct RevertOnDropSlot(Option<SlotGuard>);
) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
use detach_ancestor::Error;
impl Drop for RevertOnDropSlot {
fn drop(&mut self) {
if let Some(taken) = self.0.take() {
taken.revert();
}
}
}
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist).map_err(
|e| {
use TenantSlotError::*;
impl RevertOnDropSlot {
fn into_inner(mut self) -> SlotGuard {
self.0.take().unwrap()
}
}
impl std::ops::Deref for RevertOnDropSlot {
type Target = SlotGuard;
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap()
}
}
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let slot_guard = RevertOnDropSlot(Some(slot_guard));
match e {
MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
}
},
)?;
let tenant = {
let Some(old_slot) = slot_guard.get_old_value() else {
anyhow::bail!(
"Tenant not found when trying to complete detaching timeline ancestor"
);
};
let old_slot = slot_guard
.get_old_value()
.as_ref()
.expect("requested MustExist");
let Some(tenant) = old_slot.get_attached() else {
anyhow::bail!("Tenant is not in attached state");
return Err(Error::DetachReparent(anyhow::anyhow!(
"Tenant is not in attached state"
)));
};
if !tenant.is_active() {
anyhow::bail!("Tenant is not active");
return Err(Error::DetachReparent(anyhow::anyhow!(
"Tenant is not active"
)));
}
tenant.clone()
};
let timeline = tenant.get_timeline(timeline_id, true)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(Error::NotFound)?;
let resp = timeline
.detach_from_ancestor_and_reparent(&tenant, prepared, ctx)
.await?;
let mut slot_guard = slot_guard.into_inner();
let mut slot_guard = slot_guard;
let tenant = if resp.reset_tenant_required() {
attempt.before_reset_tenant();
@@ -1991,17 +1981,20 @@ impl TenantManager {
let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
slot_guard.drop_old_value()?;
slot_guard.drop_old_value().expect("it was just shutdown");
}
Err(_barrier) => {
slot_guard.revert();
// this really should not happen, at all, unless shutdown was already going?
anyhow::bail!("Cannot restart Tenant, already shutting down");
// this really should not happen, at all, unless a shutdown without acquiring
// tenant slot was already going? regardless, on restart the attempt tracking
// will reset to retryable.
return Err(Error::ShuttingDown);
}
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)
.map_err(|e| Error::DetachReparent(e.into()))?;
let shard_identity = config.shard;
let tenant = tenant_spawn(
@@ -2009,12 +2002,13 @@ impl TenantManager {
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(config)?,
AttachedTenantConf::try_from(config).map_err(Error::DetachReparent)?,
shard_identity,
None,
SpawnMode::Eager,
ctx,
)?;
)
.map_err(|_| Error::ShuttingDown)?;
{
let mut g = tenant.ongoing_timeline_detach.lock().unwrap();
@@ -2025,7 +2019,15 @@ impl TenantManager {
*g = Some((attempt.timeline_id, attempt.new_barrier()));
}
slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?;
// if we bail out here, we will not allow a new attempt, which should be fine.
// pageserver should be shutting down regardless? tenant_reset would help, unless it
// runs into the same problem.
slot_guard
.upsert(TenantSlot::Attached(tenant.clone()))
.map_err(|e| match e {
TenantSlotUpsertError::ShuttingDown(_) => Error::ShuttingDown,
other => Error::DetachReparent(other.into()),
})?;
tenant
} else {
tracing::info!("skipping tenant_reset as no changes made required it");
@@ -2047,7 +2049,7 @@ impl TenantManager {
Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) => {
Error::ShuttingDown
}
other => Error::Unexpected(other.into()),
other => Error::Complete(other.into()),
}
})?;
@@ -2057,19 +2059,16 @@ impl TenantManager {
let timeline = tenant
.get_timeline(attempt.timeline_id, true)
.map_err(|_| Error::DetachedNotFoundAfterRestart)?;
.map_err(Error::NotFound)?;
timeline
.complete_detaching_timeline_ancestor(&tenant, attempt, ctx)
.await
.map(|()| reparented)
.map_err(|e| e.into())
} else {
// at least the latest versions have now been downloaded and refreshed; be ready to
// retry another time.
Err(anyhow::anyhow!(
"failed to reparent all candidate timelines, please retry"
))
Err(Error::FailedToReparentAll)
}
}
@@ -2392,6 +2391,9 @@ impl SlotGuard {
/// Get any value that was present in the slot before we acquired ownership
/// of it: in state transitions, this will be the old state.
///
// FIXME: get_ prefix
// FIXME: this should be .as_ref() -- unsure why no clippy
fn get_old_value(&self) -> &Option<TenantSlot> {
&self.old_value
}

View File

@@ -23,6 +23,8 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::tenant::Generation;
#[cfg_attr(target_os = "macos", allow(unused_imports))]
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
@@ -219,9 +221,7 @@ async fn download_object<'a>(
Ok(chunk) => chunk,
Err(e) => return Err(e),
};
buffered
.write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk), ctx)
.await?;
buffered.write_buffered(chunk.slice_len(), ctx).await?;
}
let size_tracking = buffered.flush_and_into_inner(ctx).await?;
Ok(size_tracking.into_inner())

View File

@@ -22,7 +22,7 @@ use crate::{
FAILED_REMOTE_OP_RETRIES,
},
span::debug_assert_current_span_has_tenant_id,
storage_layer::{layer::local_layer_path, LayerName},
storage_layer::{layer::local_layer_path, LayerName, LayerVisibilityHint},
tasks::{warn_when_period_overrun, BackgroundLoopKind},
},
virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
@@ -296,6 +296,9 @@ impl SecondaryDetail {
}),
last_activity_ts: ods.access_time,
relative_last_activity: finite_f32::FiniteF32::ZERO,
// Secondary location layers are presumed visible, because Covered layers
// are excluded from the heatmap
visibility: LayerVisibilityHint::Visible,
}
}));

View File

@@ -42,6 +42,7 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
@@ -63,6 +64,7 @@ use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBufMut;
use tracing::*;
use utils::{
@@ -436,19 +438,28 @@ impl DeltaLayerWriterInner {
ctx: &RequestContext,
) -> anyhow::Result<()> {
let (_, res) = self
.put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
.put_value_bytes(
key,
lsn,
Value::ser(&val)?.slice_len(),
val.will_init(),
ctx,
)
.await;
res
}
async fn put_value_bytes(
async fn put_value_bytes<Buf>(
&mut self,
key: Key,
lsn: Lsn,
val: Vec<u8>,
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) {
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBufMut + Send,
{
assert!(
self.lsn_range.start <= lsn,
"lsn_start={}, lsn={}",
@@ -514,7 +525,7 @@ impl DeltaLayerWriterInner {
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf, ctx).await;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
}
assert!(self.lsn_range.start < self.lsn_range.end);
@@ -534,7 +545,7 @@ impl DeltaLayerWriterInner {
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf, ctx).await;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
let metadata = file
@@ -646,14 +657,17 @@ impl DeltaLayerWriter {
.await
}
pub async fn put_value_bytes(
pub async fn put_value_bytes<Buf>(
&mut self,
key: Key,
lsn: Lsn,
val: Vec<u8>,
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) {
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBufMut + Send,
{
self.inner
.as_mut()
.unwrap()
@@ -743,7 +757,7 @@ impl DeltaLayer {
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf, ctx).await;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
Ok(())
}
@@ -975,7 +989,7 @@ impl DeltaLayerInner {
.blobs_at
.as_slice()
.iter()
.map(|(_, (_, blob_meta))| format!("{}@{}", blob_meta.key, blob_meta.lsn))
.map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
.join(", ");
tracing::warn!(
"Oversized vectored read ({} > {}) for keys {}",
@@ -1017,10 +1031,10 @@ impl DeltaLayerInner {
Ok(blobs_buf) => blobs_buf,
Err(err) => {
let kind = err.kind();
for (_, (_, blob_meta)) in read.blobs_at.as_slice() {
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
kind
@@ -1047,7 +1061,7 @@ impl DeltaLayerInner {
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::from(anyhow!(e).context(format!(
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
))),
@@ -1291,12 +1305,12 @@ impl DeltaLayerInner {
.put_value_bytes(
key,
lsn,
std::mem::take(&mut per_blob_copy),
std::mem::take(&mut per_blob_copy).slice_len(),
will_init,
ctx,
)
.await;
per_blob_copy = tmp;
per_blob_copy = tmp.into_raw_slice().into_inner();
res?;
@@ -1678,7 +1692,7 @@ pub(crate) mod test {
let mut planned_blobs = Vec::new();
for read in vectored_reads {
for (at, (_, meta)) in read.blobs_at.as_slice() {
for (at, meta) in read.blobs_at.as_slice() {
planned_blobs.push(BlobSpec {
key: meta.key,
lsn: meta.lsn,
@@ -1871,7 +1885,7 @@ pub(crate) mod test {
for entry in entries {
let (_, res) = writer
.put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx)
.put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
.await;
res?;
}

View File

@@ -38,6 +38,7 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
@@ -354,7 +355,7 @@ impl ImageLayer {
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf, ctx).await;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
Ok(())
}
@@ -602,7 +603,7 @@ impl ImageLayerInner {
.blobs_at
.as_slice()
.iter()
.map(|(_, (_, blob_meta))| format!("{}@{}", blob_meta.key, blob_meta.lsn))
.map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
.join(", ");
tracing::warn!(
"Oversized vectored read ({} > {}) for keys {}",
@@ -630,7 +631,7 @@ impl ImageLayerInner {
}
Err(err) => {
let kind = err.kind();
for (_, (_, blob_meta)) in read.blobs_at.as_slice() {
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
@@ -786,7 +787,7 @@ impl ImageLayerWriterInner {
self.num_keys += 1;
let (_img, res) = self
.blob_writer
.write_blob_maybe_compressed(img, ctx, compression)
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res?;
@@ -838,7 +839,7 @@ impl ImageLayerWriterInner {
.await?;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf, ctx).await;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
}
@@ -858,7 +859,7 @@ impl ImageLayerWriterInner {
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf, ctx).await;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
let metadata = file

View File

@@ -12,6 +12,7 @@ use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, Result};
use camino::Utf8PathBuf;
@@ -581,11 +582,17 @@ impl InMemoryLayer {
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(Key::from_compact(*key), *lsn, buf, will_init, &ctx)
let (tmp, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
buf.slice_len(),
will_init,
&ctx,
)
.await;
res?;
buf = tmp.into_raw_slice().into_inner();
}
}
}
@@ -620,11 +627,17 @@ impl InMemoryLayer {
// => https://github.com/neondatabase/neon/issues/8183
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(Key::from_compact(*key), *lsn, buf, will_init, ctx)
let (tmp, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
buf.slice_len(),
will_init,
ctx,
)
.await;
res?;
buf = tmp.into_raw_slice().into_inner();
}
}
}

View File

@@ -312,7 +312,9 @@ impl Layer {
.get_or_maybe_download(true, Some(ctx))
.await
.map_err(|err| match err {
DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
@@ -1612,6 +1614,12 @@ pub(crate) enum DownloadError {
Failpoint(failpoints::FailpointKind),
}
impl DownloadError {
pub(crate) fn is_cancelled(&self) -> bool {
matches!(self, DownloadError::DownloadCancelled)
}
}
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,

View File

@@ -511,7 +511,7 @@ pub(crate) struct TimelineVisitOutcome {
#[derive(thiserror::Error, Debug)]
pub(crate) enum PageReconstructError {
#[error(transparent)]
Other(#[from] anyhow::Error),
Other(anyhow::Error),
#[error("Ancestor LSN wait error: {0}")]
AncestorLsnTimeout(WaitLsnError),
@@ -527,6 +527,22 @@ pub(crate) enum PageReconstructError {
MissingKey(MissingKeyError),
}
impl From<anyhow::Error> for PageReconstructError {
fn from(value: anyhow::Error) -> Self {
// with walingest.rs many PageReconstructError are wrapped in as anyhow::Error
match value.downcast::<PageReconstructError>() {
Ok(pre) => pre,
Err(other) => PageReconstructError::Other(other),
}
}
}
impl From<utils::bin_ser::DeserializeError> for PageReconstructError {
fn from(value: utils::bin_ser::DeserializeError) -> Self {
PageReconstructError::Other(anyhow::Error::new(value).context("deserialization failure"))
}
}
impl From<layer_manager::Shutdown> for PageReconstructError {
fn from(_: layer_manager::Shutdown) -> Self {
PageReconstructError::Cancelled
@@ -546,6 +562,7 @@ impl From<layer_manager::Shutdown> for GetVectoredError {
}
}
#[derive(thiserror::Error)]
pub struct MissingKeyError {
key: Key,
shard: ShardNumber,
@@ -585,11 +602,8 @@ impl PageReconstructError {
pub(crate) fn is_stopping(&self) -> bool {
use PageReconstructError::*;
match self {
Other(_) => false,
AncestorLsnTimeout(_) => false,
Cancelled => true,
WalRedo(_) => false,
MissingKey { .. } => false,
Other(_) | AncestorLsnTimeout(_) | WalRedo(_) | MissingKey(_) => false,
}
}
}
@@ -599,11 +613,11 @@ pub(crate) enum CreateImageLayersError {
#[error("timeline shutting down")]
Cancelled,
#[error(transparent)]
GetVectoredError(GetVectoredError),
#[error("read failed")]
GetVectoredError(#[source] GetVectoredError),
#[error(transparent)]
PageReconstructError(PageReconstructError),
#[error("reconstruction failed")]
PageReconstructError(#[source] PageReconstructError),
#[error(transparent)]
Other(#[from] anyhow::Error),
@@ -627,10 +641,10 @@ pub(crate) enum FlushLayerError {
// Arc<> the following non-clonable error types: we must be Clone-able because the flush error is propagated from the flush
// loop via a watch channel, where we can only borrow it.
#[error(transparent)]
#[error("create image layers (shared)")]
CreateImageLayersError(Arc<CreateImageLayersError>),
#[error(transparent)]
#[error("other (shared)")]
Other(#[from] Arc<anyhow::Error>),
}
@@ -663,34 +677,46 @@ pub(crate) enum GetVectoredError {
#[error("timeline shutting down")]
Cancelled,
#[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
#[error("requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
Oversized(u64),
#[error("Requested at invalid LSN: {0}")]
#[error("requested at invalid LSN: {0}")]
InvalidLsn(Lsn),
#[error("Requested key not found: {0}")]
#[error("requested key not found: {0}")]
MissingKey(MissingKeyError),
#[error(transparent)]
GetReadyAncestorError(GetReadyAncestorError),
#[error("ancestry walk")]
GetReadyAncestorError(#[source] GetReadyAncestorError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<GetReadyAncestorError> for GetVectoredError {
fn from(value: GetReadyAncestorError) -> Self {
use GetReadyAncestorError::*;
match value {
Cancelled => GetVectoredError::Cancelled,
AncestorLsnTimeout(_) | BadState { .. } => {
GetVectoredError::GetReadyAncestorError(value)
}
}
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetReadyAncestorError {
#[error("Ancestor LSN wait error: {0}")]
#[error("ancestor LSN wait error")]
AncestorLsnTimeout(#[from] WaitLsnError),
#[error("Bad state on timeline {timeline_id}: {state:?}")]
#[error("bad state on timeline {timeline_id}: {state:?}")]
BadState {
timeline_id: TimelineId,
state: TimelineState,
},
#[error("Cancelled")]
#[error("cancelled")]
Cancelled,
}
@@ -1619,6 +1645,20 @@ impl Timeline {
self.last_record_lsn.shutdown();
if try_freeze_and_flush {
if let Some((open, frozen)) = self
.layers
.read()
.await
.layer_map()
.map(|lm| (lm.open_layer.is_some(), lm.frozen_layers.len()))
.ok()
.filter(|(open, frozen)| *open || *frozen > 0)
{
tracing::info!(?open, frozen, "flushing and freezing on shutdown");
} else {
// this is double-shutdown, ignore it
}
// we shut down walreceiver above, so, we won't add anything more
// to the InMemoryLayer; freeze it and wait for all frozen layers
// to reach the disk & upload queue, then shut the upload queue and
@@ -3046,8 +3086,7 @@ impl Timeline {
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
timeline_owned = timeline
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
.await
.map_err(GetVectoredError::GetReadyAncestorError)?;
.await?;
timeline = &*timeline_owned;
};
@@ -3944,7 +3983,7 @@ impl Timeline {
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::PageReconstructError(err));
return Err(CreateImageLayersError::from(err));
}
}
};
@@ -4004,7 +4043,7 @@ impl Timeline {
let mut total_kb_retrieved = 0;
let mut total_keys_retrieved = 0;
for (k, v) in data {
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
let v = v?;
total_kb_retrieved += KEY_SIZE + v.len();
total_keys_retrieved += 1;
new_data.insert(k, v);
@@ -4342,7 +4381,7 @@ impl Timeline {
tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<detach_ancestor::DetachingAndReparenting, anyhow::Error> {
) -> Result<detach_ancestor::DetachingAndReparenting, detach_ancestor::Error> {
detach_ancestor::detach_and_reparent(self, tenant, prepared, ctx).await
}
@@ -4515,7 +4554,12 @@ impl Timeline {
new_images: &[ResidentLayer],
layers_to_remove: &[Layer],
) -> Result<(), CompactionError> {
let mut guard = self.layers.write().await;
let mut guard = tokio::select! {
guard = self.layers.write() => guard,
_ = self.cancel.cancelled() => {
return Err(CompactionError::ShuttingDown);
}
};
let mut duplicated_layers = HashSet::new();
@@ -5261,6 +5305,7 @@ impl Timeline {
layer: layer.to_owned().into(),
last_activity_ts,
relative_last_activity: finite_f32::FiniteF32::ZERO,
visibility: layer.visibility(),
}
})
.collect();

View File

@@ -1048,11 +1048,22 @@ impl Timeline {
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
let mut next_hole = 0; // index of next hole in holes vector
let mut keys = 0;
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.await
.map_err(CompactionError::Other)?
{
keys += 1;
if keys % 32_768 == 0 && self.cancel.is_cancelled() {
// avoid hitting the cancellation token on every key. in benches, we end up
// shuffling an order of million keys per layer, this means we'll check it
// around tens of times per layer.
return Err(CompactionError::ShuttingDown);
}
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -1157,6 +1168,8 @@ impl Timeline {
.await
.map_err(CompactionError::Other)?,
);
keys = 0;
}
writer
@@ -2325,7 +2338,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
key_range,
))
} else {
// The current compaction implementatin only ever requests the key space
// The current compaction implementation only ever requests the key space
// at the compaction end LSN.
anyhow::bail!("keyspace not available for requested lsn");
}

View File

@@ -5,7 +5,6 @@ use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
tenant::{
mgr::GetActiveTenantError,
remote_timeline_client::index::GcBlockingReason::DetachAncestor,
storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
Tenant,
@@ -23,61 +22,74 @@ use utils::{completion, generation::Generation, http::error::ApiError, id::Timel
pub(crate) enum Error {
#[error("no ancestors")]
NoAncestor,
#[error("too many ancestors")]
TooManyAncestors,
#[error("shutting down, please retry later")]
ShuttingDown,
#[error("flushing failed")]
FlushAncestor(#[source] FlushLayerError),
#[error("layer download failed")]
RewrittenDeltaDownloadFailed(#[source] crate::tenant::storage_layer::layer::DownloadError),
#[error("copying LSN prefix locally failed")]
CopyDeltaPrefix(#[source] anyhow::Error),
#[error("upload rewritten layer")]
UploadRewritten(#[source] anyhow::Error),
#[error(transparent)]
NotFound(crate::tenant::GetTimelineError),
#[error("failed to reparent all candidate timelines, please retry")]
FailedToReparentAll,
#[error("ancestor is already being detached by: {}", .0)]
OtherTimelineDetachOngoing(TimelineId),
#[error("remote copying layer failed")]
CopyFailed(#[source] anyhow::Error),
#[error("preparing to timeline ancestor detach failed")]
Prepare(#[source] anyhow::Error),
#[error("wait for tenant to activate after restarting")]
WaitToActivate(#[source] GetActiveTenantError),
#[error("detaching and reparenting failed")]
DetachReparent(#[source] anyhow::Error),
#[error("detached timeline was not found after restart")]
DetachedNotFoundAfterRestart,
#[error("unexpected error")]
Unexpected(#[source] anyhow::Error),
#[error("completing ancestor detach failed")]
Complete(#[source] anyhow::Error),
#[error("failpoint: {}", .0)]
Failpoint(&'static str),
}
impl Error {
/// Try to catch cancellation from within the `anyhow::Error`, or wrap the anyhow as the given
/// variant or fancier `or_else`.
fn launder<F>(e: anyhow::Error, or_else: F) -> Error
where
F: Fn(anyhow::Error) -> Error,
{
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::upload_queue::NotInitialized;
use remote_storage::TimeoutOrCancel;
if e.is::<NotInitialized>()
|| TimeoutOrCancel::caused_by_cancel(&e)
|| e.downcast_ref::<remote_storage::DownloadError>()
.is_some_and(|e| e.is_cancelled())
|| e.is::<WaitCompletionError>()
{
Error::ShuttingDown
} else {
or_else(e)
}
}
}
impl From<Error> for ApiError {
fn from(value: Error) -> Self {
match value {
e @ Error::NoAncestor => ApiError::Conflict(e.to_string()),
// TODO: ApiError converts the anyhow using debug formatting ... just stop using ApiError?
e @ Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", e)),
Error::NoAncestor => ApiError::Conflict(value.to_string()),
Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", value)),
Error::ShuttingDown => ApiError::ShuttingDown,
Error::OtherTimelineDetachOngoing(_) => {
ApiError::ResourceUnavailable("other timeline detach is already ongoing".into())
Error::OtherTimelineDetachOngoing(_) | Error::FailedToReparentAll => {
ApiError::ResourceUnavailable(value.to_string().into())
}
e @ Error::WaitToActivate(_) => {
let s = utils::error::report_compact_sources(&e).to_string();
ApiError::ResourceUnavailable(s.into())
}
// All of these contain shutdown errors, in fact, it's the most common
e @ Error::FlushAncestor(_)
| e @ Error::RewrittenDeltaDownloadFailed(_)
| e @ Error::CopyDeltaPrefix(_)
| e @ Error::UploadRewritten(_)
| e @ Error::CopyFailed(_)
| e @ Error::Unexpected(_)
| e @ Error::Failpoint(_) => ApiError::InternalServerError(e.into()),
Error::DetachedNotFoundAfterRestart => ApiError::NotFound(value.into()),
Error::NotFound(e) => ApiError::from(e),
// these variants should have no cancellation errors because of Error::launder
Error::Prepare(_)
| Error::DetachReparent(_)
| Error::Complete(_)
| Error::Failpoint(_) => ApiError::InternalServerError(value.into()),
}
}
}
@@ -95,39 +107,6 @@ impl From<super::layer_manager::Shutdown> for Error {
}
}
impl From<FlushLayerError> for Error {
fn from(value: FlushLayerError) -> Self {
match value {
FlushLayerError::Cancelled => Error::ShuttingDown,
FlushLayerError::NotRunning(_) => {
// FIXME(#6424): technically statically unreachable right now, given how we never
// drop the sender
Error::ShuttingDown
}
FlushLayerError::CreateImageLayersError(_) | FlushLayerError::Other(_) => {
Error::FlushAncestor(value)
}
}
}
}
impl From<GetActiveTenantError> for Error {
fn from(value: GetActiveTenantError) -> Self {
use pageserver_api::models::TenantState;
use GetActiveTenantError::*;
match value {
Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) | SwitchedTenant => {
Error::ShuttingDown
}
WaitForActiveTimeout { .. } | NotFound(_) | Broken(_) | WillNotBecomeActive(_) => {
// NotFound seems out-of-place
Error::WaitToActivate(value)
}
}
}
}
pub(crate) enum Progress {
Prepared(Attempt, PreparedTimelineDetach),
Done(AncestorDetached),
@@ -236,7 +215,7 @@ pub(super) async fn prepare(
let attempt = start_new_attempt(detached, tenant).await?;
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable");
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking-pausable");
fail::fail_point!(
"timeline-detach-ancestor::before_starting_after_locking",
@@ -265,7 +244,17 @@ pub(super) async fn prepare(
}
};
res?;
res.map_err(|e| {
use FlushLayerError::*;
match e {
Cancelled | NotRunning(_) => {
// FIXME(#6424): technically statically unreachable right now, given how we never
// drop the sender
Error::ShuttingDown
}
CreateImageLayersError(_) | Other(_) => Error::Prepare(e.into()),
}
})?;
// we do not need to wait for uploads to complete but we do need `struct Layer`,
// copying delta prefix is unsupported currently for `InMemoryLayer`.
@@ -346,7 +335,7 @@ pub(super) async fn prepare(
}
Ok(Ok(None)) => {}
Ok(Err(e)) => return Err(e),
Err(je) => return Err(Unexpected(je.into())),
Err(je) => return Err(Error::Prepare(je.into())),
}
}
@@ -394,7 +383,7 @@ pub(super) async fn prepare(
Ok(Err(failed)) => {
return Err(failed);
}
Err(je) => return Err(Unexpected(je.into())),
Err(je) => return Err(Error::Prepare(je.into())),
}
}
@@ -416,8 +405,7 @@ async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attem
crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
)
.await
// FIXME: better error
.map_err(Error::Unexpected)?;
.map_err(|e| Error::launder(e, Error::Prepare))?;
Ok(attempt)
}
@@ -546,19 +534,17 @@ async fn upload_rewritten_layer(
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<Option<Layer>, Error> {
use Error::UploadRewritten;
let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
let Some(copied) = copied else {
return Ok(None);
};
// FIXME: better shuttingdown error
target
.remote_client
.upload_layer_file(&copied, cancel)
.await
.map_err(UploadRewritten)?;
.map_err(|e| Error::launder(e, Error::Prepare))?;
Ok(Some(copied.into()))
}
@@ -569,10 +555,8 @@ async fn copy_lsn_prefix(
target_timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<Option<ResidentLayer>, Error> {
use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed, ShuttingDown};
if target_timeline.cancel.is_cancelled() {
return Err(ShuttingDown);
return Err(Error::ShuttingDown);
}
tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
@@ -586,18 +570,22 @@ async fn copy_lsn_prefix(
ctx,
)
.await
.map_err(CopyDeltaPrefix)?;
.with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}"))
.map_err(Error::Prepare)?;
let resident = layer
.download_and_keep_resident()
.await
// likely shutdown
.map_err(RewrittenDeltaDownloadFailed)?;
let resident = layer.download_and_keep_resident().await.map_err(|e| {
if e.is_cancelled() {
Error::ShuttingDown
} else {
Error::Prepare(e.into())
}
})?;
let records = resident
.copy_delta_prefix(&mut writer, end_lsn, ctx)
.await
.map_err(CopyDeltaPrefix)?;
.with_context(|| format!("copy lsn prefix of ancestors {layer}"))
.map_err(Error::Prepare)?;
drop(resident);
@@ -615,9 +603,9 @@ async fn copy_lsn_prefix(
let (desc, path) = writer
.finish(reused_highest_key, ctx)
.await
.map_err(CopyDeltaPrefix)?;
.map_err(Error::Prepare)?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(CopyDeltaPrefix)?;
.map_err(Error::Prepare)?;
tracing::debug!(%layer, %copied, "new layer produced");
@@ -633,8 +621,6 @@ async fn remote_copy(
generation: Generation,
cancel: &CancellationToken,
) -> Result<Layer, Error> {
use Error::CopyFailed;
// depending if Layer::keep_resident we could hardlink
let mut metadata = adopted.metadata();
@@ -648,13 +634,12 @@ async fn remote_copy(
metadata,
);
// FIXME: better shuttingdown error
adoptee
.remote_client
.copy_timeline_layer(adopted, &owned, cancel)
.await
.map(move |()| owned)
.map_err(CopyFailed)
.map_err(|e| Error::launder(e, Error::Prepare))
}
pub(crate) enum DetachingAndReparenting {
@@ -698,7 +683,7 @@ pub(super) async fn detach_and_reparent(
tenant: &Tenant,
prepared: PreparedTimelineDetach,
_ctx: &RequestContext,
) -> Result<DetachingAndReparenting, anyhow::Error> {
) -> Result<DetachingAndReparenting, Error> {
let PreparedTimelineDetach { layers } = prepared;
#[derive(Debug)]
@@ -783,7 +768,8 @@ pub(super) async fn detach_and_reparent(
(ancestor.timeline_id, ancestor_lsn),
)
.await
.context("publish layers and detach ancestor")?;
.context("publish layers and detach ancestor")
.map_err(|e| Error::launder(e, Error::DetachReparent))?;
tracing::info!(
ancestor=%ancestor.timeline_id,
@@ -927,8 +913,7 @@ pub(super) async fn complete(
crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
)
.await
// FIXME: better error
.map_err(Error::Unexpected)?;
.map_err(|e| Error::launder(e, Error::Complete))?;
Ok(())
}

View File

@@ -30,7 +30,8 @@ use crate::{
pgdatadir_mapping::CollectKeySpaceError,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
storage_layer::LayerVisibilityHint, tasks::BackgroundLoopKind, timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
},
};
@@ -241,7 +242,22 @@ impl Timeline {
}
};
no_activity_for > p.threshold
match layer.visibility() {
LayerVisibilityHint::Visible => {
// Usual case: a visible layer might be read any time, and we will keep it
// resident until it hits our configured TTL threshold.
no_activity_for > p.threshold
}
LayerVisibilityHint::Covered => {
// Covered layers: this is probably a layer that was recently covered by
// an image layer during compaction. We don't evict it immediately, but
// it doesn't stay resident for the full `threshold`: we just keep it
// for a shorter time in case
// - it is used for Timestamp->LSN lookups
// - a new branch is created in recent history which will read this layer
no_activity_for > p.period
}
}
})
.cloned()
.for_each(|layer| {

View File

@@ -19,7 +19,6 @@ use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use bytes::BytesMut;
use itertools::Itertools;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
@@ -62,7 +61,7 @@ pub struct VectoredRead {
pub start: u64,
pub end: u64,
/// Starting offsets and metadata for each blob in this read
pub blobs_at: VecMap<u64, (u64, BlobMeta)>,
pub blobs_at: VecMap<u64, BlobMeta>,
}
impl VectoredRead {
@@ -80,7 +79,7 @@ pub(crate) enum VectoredReadExtended {
pub(crate) struct VectoredReadBuilder {
start: u64,
end: u64,
blobs_at: VecMap<u64, (u64, BlobMeta)>,
blobs_at: VecMap<u64, BlobMeta>,
max_read_size: Option<usize>,
}
@@ -98,7 +97,7 @@ impl VectoredReadBuilder {
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, (end_offset, meta))
.append(start_offset, meta)
.expect("First insertion always succeeds");
Self {
@@ -123,7 +122,7 @@ impl VectoredReadBuilder {
} {
self.end = end;
self.blobs_at
.append(start, (end, meta))
.append(start, meta)
.expect("LSNs are ordered within vectored reads");
return VectoredReadExtended::Yes;
@@ -271,42 +270,6 @@ impl VectoredReadPlanner {
reads
}
pub fn finish_v2(self) -> Vec<VectoredRead> {
const STX_ALIGN: usize = 4096;
self.blobs
.into_iter()
.flat_map(|(key, blobs_for_key)| {
blobs_for_key
.into_iter()
.map(move |(lsn, start_offset, end_offset)| {
VectoredReadBuilder::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
self.max_read_size,
)
})
})
.coalesce(|mut x, mut y| {
if x.end == y.start && {
if let Some(max_read_size) = x.max_read_size {
x.size() + y.size() <= max_read_size
} else {
true
}
} {
if x.blobs_at.extend(&mut y.blobs_at).is_ok() {
x.end = y.end;
return Ok(x);
}
}
Err((x, y))
})
.map(|x| x.build())
.collect()
}
}
/// Disk reader for vectored blob spans (does not go through the page cache)
@@ -351,10 +314,21 @@ impl<'a> VectoredBlobReader<'a> {
let mut metas = Vec::with_capacity(blobs_at.len());
// Blobs in `read` only provide their starting offset. The end offset
// of a blob is implicit: the start of the next blob if one exists
// or the end of the read.
let pairs = blobs_at.iter().zip(
blobs_at
.iter()
.map(Some)
.skip(1)
.chain(std::iter::once(None)),
);
// Some scratch space, put here for reusing the allocation
let mut decompressed_vec = Vec::new();
for (offset, (end_offset, meta)) in blobs_at.iter() {
for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];
@@ -380,8 +354,10 @@ impl<'a> VectoredBlobReader<'a> {
};
let start_raw = offset_in_buf + size_length;
let end_raw = *end_offset;
let end_raw = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
None => start_raw + blob_size,
};
assert_eq!(end_raw - start_raw, blob_size);
let (start, end);
if compression_bits == BYTE_UNCOMPRESSED {
@@ -493,7 +469,7 @@ impl StreamingVectoredReadPlanner {
self.read_builder = {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, (end_offset, BlobMeta { key, lsn }))
.append(start_offset, BlobMeta { key, lsn })
.expect("First insertion always succeeds");
Some(VectoredReadBuilder {

View File

@@ -17,6 +17,7 @@ use crate::page_cache::{PageWriteGuard, PAGE_SZ};
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
@@ -50,6 +51,7 @@ pub(crate) mod owned_buffers_io {
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod io_buf_ext;
pub(crate) mod slice;
pub(crate) mod write;
pub(crate) mod util {
@@ -637,24 +639,24 @@ impl VirtualFile {
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
pub async fn write_all_at<Buf: IoBuf + Send>(
&self,
buf: B,
buf: FullSlice<Buf>,
mut offset: u64,
ctx: &RequestContext,
) -> (B::Buf, Result<(), Error>) {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(()));
}
let mut buf = buf.slice(0..buf_len);
) -> (FullSlice<Buf>, Result<(), Error>) {
let buf = buf.into_raw_slice();
let bounds = buf.bounds();
let restore =
|buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
let mut buf = buf;
while !buf.is_empty() {
let res;
(buf, res) = self.write_at(buf, offset, ctx).await;
let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
buf = tmp.into_raw_slice();
match res {
Ok(0) => {
return (
Slice::into_inner(buf),
restore(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
@@ -666,33 +668,33 @@ impl VirtualFile {
offset += n as u64;
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (Slice::into_inner(buf), Err(e)),
Err(e) => return (restore(buf), Err(e)),
}
}
(Slice::into_inner(buf), Ok(()))
(restore(buf), Ok(()))
}
/// Writes `buf.slice(0..buf.bytes_init())`.
/// Returns the IoBuf that is underlying the BoundedBuf `buf`.
/// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
/// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
/// Writes `buf` to the file at the current offset.
///
/// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: B,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(0));
}
let mut buf = buf.slice(0..nbytes);
) -> (FullSlice<Buf>, Result<usize, Error>) {
let buf = buf.into_raw_slice();
let bounds = buf.bounds();
let restore =
|buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
let nbytes = buf.len();
let mut buf = buf;
while !buf.is_empty() {
let res;
(buf, res) = self.write(buf, ctx).await;
let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
buf = tmp.into_raw_slice();
match res {
Ok(0) => {
return (
Slice::into_inner(buf),
restore(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
@@ -703,17 +705,17 @@ impl VirtualFile {
buf = buf.slice(n..);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (Slice::into_inner(buf), Err(e)),
Err(e) => return (restore(buf), Err(e)),
}
}
(Slice::into_inner(buf), Ok(nbytes))
(restore(buf), Ok(nbytes))
}
async fn write<B: IoBuf + Send>(
&mut self,
buf: Slice<B>,
buf: FullSlice<B>,
ctx: &RequestContext,
) -> (Slice<B>, Result<usize, std::io::Error>) {
) -> (FullSlice<B>, Result<usize, std::io::Error>) {
let pos = self.pos;
let (buf, res) = self.write_at(buf, pos, ctx).await;
let n = match res {
@@ -756,10 +758,10 @@ impl VirtualFile {
async fn write_at<B: IoBuf + Send>(
&self,
buf: Slice<B>,
buf: FullSlice<B>,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (Slice<B>, Result<usize, Error>) {
) -> (FullSlice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
@@ -1093,11 +1095,11 @@ impl Drop for VirtualFile {
impl OwnedAsyncWriter for VirtualFile {
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: B,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
) -> std::io::Result<(usize, FullSlice<Buf>)> {
let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
res.map(move |v| (v, buf))
}
@@ -1159,7 +1161,8 @@ mod tests {
use crate::task_mgr::TaskKind;
use super::*;
use owned_buffers_io::slice::SliceExt;
use owned_buffers_io::io_buf_ext::IoBufExt;
use owned_buffers_io::slice::SliceMutExt;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
@@ -1193,9 +1196,9 @@ mod tests {
}
}
}
async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all_at<Buf: IoBuf + Send>(
&self,
buf: B,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> Result<(), Error> {
@@ -1204,13 +1207,7 @@ mod tests {
let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
res
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all_at(&buf.slice(0..buf_len), offset)
}
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
@@ -1219,9 +1216,9 @@ mod tests {
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: B,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> Result<(), Error> {
match self {
@@ -1229,13 +1226,7 @@ mod tests {
let (_buf, res) = file.write_all(buf, ctx).await;
res.map(|_| ())
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all(&buf.slice(0..buf_len))
}
MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
}
}
@@ -1347,7 +1338,9 @@ mod tests {
&ctx,
)
.await?;
file_a.write_all(b"foobar".to_vec(), &ctx).await?;
file_a
.write_all(b"foobar".to_vec().slice_len(), &ctx)
.await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string(&ctx).await.unwrap_err();
@@ -1356,7 +1349,10 @@ mod tests {
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err();
let _ = file_a
.write_all(b"bar".to_vec().slice_len(), &ctx)
.await
.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string(&ctx).await?);
@@ -1399,8 +1395,12 @@ mod tests {
&ctx,
)
.await?;
file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?;
file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?;
file_b
.write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx)
.await?;
file_b
.write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx)
.await?;
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");

View File

@@ -12,7 +12,7 @@
#[cfg(target_os = "linux")]
pub(super) mod tokio_epoll_uring_ext;
use tokio_epoll_uring::{IoBuf, Slice};
use tokio_epoll_uring::IoBuf;
use tracing::Instrument;
pub(crate) use super::api::IoEngineKind;
@@ -107,7 +107,10 @@ use std::{
sync::atomic::{AtomicU8, Ordering},
};
use super::{owned_buffers_io::slice::SliceExt, FileGuard, Metadata};
use super::{
owned_buffers_io::{io_buf_ext::FullSlice, slice::SliceMutExt},
FileGuard, Metadata,
};
#[cfg(target_os = "linux")]
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
@@ -206,8 +209,8 @@ impl IoEngine {
&self,
file_guard: FileGuard,
offset: u64,
buf: Slice<B>,
) -> ((FileGuard, Slice<B>), std::io::Result<usize>) {
buf: FullSlice<B>,
) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
match self {
IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
@@ -217,8 +220,12 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.write(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
let ((file_guard, slice), res) =
system.write(file_guard, offset, buf.into_raw_slice()).await;
(
(file_guard, FullSlice::must_new(slice)),
res.map_err(epoll_uring_error_to_std),
)
}
}
}

View File

@@ -0,0 +1,78 @@
//! See [`FullSlice`].
use bytes::{Bytes, BytesMut};
use std::ops::{Deref, Range};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
/// The true owned equivalent for Rust [`slice`]. Use this for the write path.
///
/// Unlike [`tokio_epoll_uring::Slice`], which we unfortunately inherited from `tokio-uring`,
/// [`FullSlice`] is guaranteed to have all its bytes initialized. This means that
/// [`<FullSlice as Deref<Target = [u8]>>::len`] is equal to [`Slice::bytes_init`] and [`Slice::bytes_total`].
///
pub struct FullSlice<B> {
slice: Slice<B>,
}
impl<B> FullSlice<B>
where
B: IoBuf,
{
pub(crate) fn must_new(slice: Slice<B>) -> Self {
assert_eq!(slice.bytes_init(), slice.bytes_total());
FullSlice { slice }
}
pub(crate) fn into_raw_slice(self) -> Slice<B> {
let FullSlice { slice: s } = self;
s
}
}
impl<B> Deref for FullSlice<B>
where
B: IoBuf,
{
type Target = [u8];
fn deref(&self) -> &[u8] {
let rust_slice = &self.slice[..];
assert_eq!(rust_slice.len(), self.slice.bytes_init());
assert_eq!(rust_slice.len(), self.slice.bytes_total());
rust_slice
}
}
pub(crate) trait IoBufExt {
/// Get a [`FullSlice`] for the entire buffer, i.e., `self[..]` or `self[0..self.len()]`.
fn slice_len(self) -> FullSlice<Self>
where
Self: Sized;
}
macro_rules! impl_io_buf_ext {
($T:ty) => {
impl IoBufExt for $T {
#[inline(always)]
fn slice_len(self) -> FullSlice<Self> {
let len = self.len();
let s = if len == 0 {
// `BoundedBuf::slice(0..len)` or `BoundedBuf::slice(..)` has an incorrect assertion,
// causing a panic if len == 0.
// The Slice::from_buf_bounds has the correct assertion (<= instead of <).
// => https://github.com/neondatabase/tokio-epoll-uring/issues/46
let slice = self.slice_full();
let mut bounds: Range<_> = slice.bounds();
bounds.end = bounds.start;
Slice::from_buf_bounds(slice.into_inner(), bounds)
} else {
self.slice(0..len)
};
FullSlice::must_new(s)
}
}
};
}
impl_io_buf_ext!(Bytes);
impl_io_buf_ext!(BytesMut);
impl_io_buf_ext!(Vec<u8>);

View File

@@ -3,14 +3,14 @@ use tokio_epoll_uring::BoundedBufMut;
use tokio_epoll_uring::IoBufMut;
use tokio_epoll_uring::Slice;
pub(crate) trait SliceExt {
pub(crate) trait SliceMutExt {
/// Get a `&mut[0..self.bytes_total()`] slice, for when you need to do borrow-based IO.
///
/// See the test case `test_slice_full_zeroed` for the difference to just doing `&slice[..]`
fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8];
}
impl<B> SliceExt for Slice<B>
impl<B> SliceMutExt for Slice<B>
where
B: IoBufMut,
{

View File

@@ -1,5 +1,8 @@
use crate::{context::RequestContext, virtual_file::owned_buffers_io::write::OwnedAsyncWriter};
use tokio_epoll_uring::{BoundedBuf, IoBuf};
use crate::{
context::RequestContext,
virtual_file::owned_buffers_io::{io_buf_ext::FullSlice, write::OwnedAsyncWriter},
};
use tokio_epoll_uring::IoBuf;
pub struct Writer<W> {
dst: W,
@@ -35,11 +38,11 @@ where
W: OwnedAsyncWriter,
{
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: B,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
) -> std::io::Result<(usize, FullSlice<Buf>)> {
let (nwritten, buf) = self.dst.write_all(buf, ctx).await?;
self.bytes_amount += u64::try_from(nwritten).unwrap();
Ok((nwritten, buf))

View File

@@ -1,16 +1,18 @@
use bytes::BytesMut;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tokio_epoll_uring::IoBuf;
use crate::context::RequestContext;
use super::io_buf_ext::{FullSlice, IoBufExt};
/// A trait for doing owned-buffer write IO.
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
pub trait OwnedAsyncWriter {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: B,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> std::io::Result<(usize, B::Buf)>;
) -> std::io::Result<(usize, FullSlice<Buf>)>;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
@@ -79,9 +81,11 @@ where
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn write_buffered<S: IoBuf + Send>(
&mut self,
chunk: Slice<S>,
chunk: FullSlice<S>,
ctx: &RequestContext,
) -> std::io::Result<(usize, S)> {
) -> std::io::Result<(usize, FullSlice<S>)> {
let chunk = chunk.into_raw_slice();
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
if chunk.len() >= self.buf().cap() {
@@ -94,7 +98,10 @@ where
.pending(),
0
);
let (nwritten, chunk) = self.writer.write_all(chunk, ctx).await?;
let (nwritten, chunk) = self
.writer
.write_all(FullSlice::must_new(chunk), ctx)
.await?;
assert_eq!(nwritten, chunk_len);
return Ok((nwritten, chunk));
}
@@ -114,7 +121,7 @@ where
}
}
assert!(slice.is_empty(), "by now we should have drained the chunk");
Ok((chunk_len, chunk.into_inner()))
Ok((chunk_len, FullSlice::must_new(chunk)))
}
/// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data.
@@ -150,9 +157,12 @@ where
self.buf = Some(buf);
return Ok(());
}
let (nwritten, io_buf) = self.writer.write_all(buf.flush(), ctx).await?;
let slice = buf.flush();
let (nwritten, slice) = self.writer.write_all(slice, ctx).await?;
assert_eq!(nwritten, buf_len);
self.buf = Some(Buffer::reuse_after_flush(io_buf));
self.buf = Some(Buffer::reuse_after_flush(
slice.into_raw_slice().into_inner(),
));
Ok(())
}
}
@@ -172,9 +182,9 @@ pub trait Buffer {
/// Number of bytes in the buffer.
fn pending(&self) -> usize;
/// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data
/// Turns `self` into a [`FullSlice`] of the pending data
/// so we can use [`tokio_epoll_uring`] to write it to disk.
fn flush(self) -> Slice<Self::IoBuf>;
fn flush(self) -> FullSlice<Self::IoBuf>;
/// After the write to disk is done and we have gotten back the slice,
/// [`BufferedWriter`] uses this method to re-use the io buffer.
@@ -198,12 +208,8 @@ impl Buffer for BytesMut {
self.len()
}
fn flush(self) -> Slice<BytesMut> {
if self.is_empty() {
return self.slice_full();
}
let len = self.len();
self.slice(0..len)
fn flush(self) -> FullSlice<BytesMut> {
self.slice_len()
}
fn reuse_after_flush(mut iobuf: BytesMut) -> Self {
@@ -213,18 +219,13 @@ impl Buffer for BytesMut {
}
impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: B,
buf: FullSlice<Buf>,
_: &RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return Ok((0, Slice::into_inner(buf.slice_full())));
}
let buf = buf.slice(0..nbytes);
) -> std::io::Result<(usize, FullSlice<Buf>)> {
self.extend_from_slice(&buf[..]);
Ok((buf.len(), Slice::into_inner(buf)))
Ok((buf.len(), buf))
}
}
@@ -241,19 +242,13 @@ mod tests {
writes: Vec<Vec<u8>>,
}
impl OwnedAsyncWriter for RecorderWriter {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: B,
buf: FullSlice<Buf>,
_: &RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {
self.writes.push(vec![]);
return Ok((0, Slice::into_inner(buf.slice_full())));
}
let buf = buf.slice(0..nbytes);
) -> std::io::Result<(usize, FullSlice<Buf>)> {
self.writes.push(Vec::from(&buf[..]));
Ok((buf.len(), Slice::into_inner(buf)))
Ok((buf.len(), buf))
}
}
@@ -264,7 +259,7 @@ mod tests {
macro_rules! write {
($writer:ident, $data:literal) => {{
$writer
.write_buffered(::bytes::Bytes::from_static($data).slice_full(), &test_ctx())
.write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx())
.await?;
}};
}

View File

@@ -515,7 +515,7 @@ impl WalIngest {
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
// do not materialize null pages because them most likely be soon replaced with real data
&& blk.bimg_len != 0
{
@@ -1702,7 +1702,7 @@ async fn get_relsize(
modification: &DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
) -> anyhow::Result<BlockNumber> {
) -> Result<BlockNumber, PageReconstructError> {
let nblocks = if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), ctx)

View File

@@ -1018,7 +1018,7 @@ pub fn decode_wal_record(
);
let blk_img_is_compressed =
postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version);
if blk_img_is_compressed {
debug!("compressed block image , pg_version = {}", pg_version);

View File

@@ -192,6 +192,13 @@ LogicalSlotsMonitorMain(Datum main_arg)
{
XLogRecPtr cutoff_lsn;
/* In case of a SIGHUP, just reload the configuration. */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* If there are too many .snap files, just drop all logical slots to
* prevent aux files bloat.

View File

@@ -186,7 +186,7 @@ static void
fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
{
*infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK | HEAP_COMBOCID);
*infomask2 &= ~HEAP_KEYS_UPDATED;
if (infobits & XLHL_XMAX_IS_MULTI)
@@ -195,6 +195,8 @@ fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
*infomask |= HEAP_XMAX_LOCK_ONLY;
if (infobits & XLHL_XMAX_EXCL_LOCK)
*infomask |= HEAP_XMAX_EXCL_LOCK;
if (infobits & XLHL_COMBOCID)
*infomask |= HEAP_COMBOCID;
/* note HEAP_XMAX_SHR_LOCK isn't considered here */
if (infobits & XLHL_XMAX_KEYSHR_LOCK)
*infomask |= HEAP_XMAX_KEYSHR_LOCK;
@@ -284,7 +286,7 @@ redo_neon_heap_insert(XLogReaderState *record)
htup->t_infomask = xlhdr.t_infomask;
htup->t_hoff = xlhdr.t_hoff;
HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, xlhdr.t_cid);
htup->t_choice.t_heap.t_field3.t_cid = xlhdr.t_cid;
htup->t_ctid = target_tid;
if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
@@ -373,7 +375,7 @@ redo_neon_heap_delete(XLogReaderState *record)
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
else
HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
HeapTupleHeaderSetCmax(htup, xlrec->t_cid, false);
htup->t_choice.t_heap.t_field3.t_cid = xlrec->t_cid;
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, XLogRecGetXid(record));
@@ -490,7 +492,7 @@ redo_neon_heap_update(XLogReaderState *record, bool hot_update)
fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
&htup->t_infomask2);
HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
HeapTupleHeaderSetCmax(htup, xlrec->t_cid, false);
htup->t_choice.t_heap.t_field3.t_cid = xlrec->t_cid;
/* Set forward chain link in t_ctid */
htup->t_ctid = newtid;
@@ -623,7 +625,7 @@ redo_neon_heap_update(XLogReaderState *record, bool hot_update)
htup->t_hoff = xlhdr.t_hoff;
HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, xlhdr.t_cid);
htup->t_choice.t_heap.t_field3.t_cid = xlhdr.t_cid;
HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
/* Make sure there is no forward chain link in t_ctid */
htup->t_ctid = newtid;
@@ -728,7 +730,7 @@ redo_neon_heap_lock(XLogReaderState *record)
offnum);
}
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
HeapTupleHeaderSetCmax(htup, xlrec->t_cid, false);
htup->t_choice.t_heap.t_field3.t_cid = xlrec->t_cid;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
@@ -840,7 +842,7 @@ redo_neon_heap_multi_insert(XLogReaderState *record)
htup->t_infomask = xlhdr->t_infomask;
htup->t_hoff = xlhdr->t_hoff;
HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, xlrec->t_cid);
htup->t_choice.t_heap.t_field3.t_cid = xlrec->t_cid;
ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);

View File

@@ -11,6 +11,7 @@ testing = []
[dependencies]
ahash.workspace = true
anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-trait.workspace = true
atomic-take.workspace = true
@@ -73,7 +74,7 @@ rustls.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2 = { workspace = true, features = ["asm"] }
sha2 = { workspace = true, features = ["asm", "oid"] }
smol_str.workspace = true
smallvec.workspace = true
socket2.workspace = true
@@ -103,6 +104,14 @@ x509-parser.workspace = true
postgres-protocol.workspace = true
redis.workspace = true
# jwt stuff
jose-jwa = "0.1.2"
jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
signature = "2"
ecdsa = "0.16"
p256 = "0.13"
rsa = "0.9"
workspace_hack.workspace = true
[dev-dependencies]

View File

@@ -1,5 +1,6 @@
mod classic;
mod hacks;
pub mod jwt;
mod link;
use std::net::IpAddr;

View File

@@ -0,0 +1,556 @@
use std::{future::Future, sync::Arc, time::Duration};
use anyhow::{bail, ensure, Context};
use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use jose_jwk::crypto::KeyInfo;
use signature::Verifier;
use tokio::time::Instant;
use crate::{http::parse_json_body_with_limit, intern::EndpointIdInt};
// TODO(conrad): make these configurable.
const MIN_RENEW: Duration = Duration::from_secs(30);
const AUTO_RENEW: Duration = Duration::from_secs(300);
const MAX_RENEW: Duration = Duration::from_secs(3600);
const MAX_JWK_BODY_SIZE: usize = 64 * 1024;
/// How to get the JWT auth rules
pub trait FetchAuthRules: Clone + Send + Sync + 'static {
fn fetch_auth_rules(&self) -> impl Future<Output = anyhow::Result<AuthRules>> + Send;
}
#[derive(Clone)]
struct FetchAuthRulesFromCplane {
#[allow(dead_code)]
endpoint: EndpointIdInt,
}
impl FetchAuthRules for FetchAuthRulesFromCplane {
async fn fetch_auth_rules(&self) -> anyhow::Result<AuthRules> {
Err(anyhow::anyhow!("not yet implemented"))
}
}
pub struct AuthRules {
jwks_urls: Vec<url::Url>,
}
#[derive(Default)]
pub struct JwkCache {
client: reqwest::Client,
map: DashMap<EndpointIdInt, Arc<JwkCacheEntryLock>>,
}
pub struct JwkCacheEntryLock {
cached: ArcSwapOption<JwkCacheEntry>,
lookup: tokio::sync::Semaphore,
}
impl Default for JwkCacheEntryLock {
fn default() -> Self {
JwkCacheEntryLock {
cached: ArcSwapOption::empty(),
lookup: tokio::sync::Semaphore::new(1),
}
}
}
pub struct JwkCacheEntry {
/// Should refetch at least every hour to verify when old keys have been removed.
/// Should refetch when new key IDs are seen only every 5 minutes or so
last_retrieved: Instant,
/// cplane will return multiple JWKs urls that we need to scrape.
key_sets: ahash::HashMap<url::Url, jose_jwk::JwkSet>,
}
impl JwkCacheEntryLock {
async fn acquire_permit<'a>(self: &'a Arc<Self>) -> JwkRenewalPermit<'a> {
JwkRenewalPermit::acquire_permit(self).await
}
fn try_acquire_permit<'a>(self: &'a Arc<Self>) -> Option<JwkRenewalPermit<'a>> {
JwkRenewalPermit::try_acquire_permit(self)
}
async fn renew_jwks<F: FetchAuthRules>(
&self,
_permit: JwkRenewalPermit<'_>,
client: &reqwest::Client,
auth_rules: &F,
) -> anyhow::Result<Arc<JwkCacheEntry>> {
// double check that no one beat us to updating the cache.
let now = Instant::now();
let guard = self.cached.load_full();
if let Some(cached) = guard {
let last_update = now.duration_since(cached.last_retrieved);
if last_update < Duration::from_secs(300) {
return Ok(cached);
}
}
let rules = auth_rules.fetch_auth_rules().await?;
let mut key_sets = ahash::HashMap::with_capacity_and_hasher(
rules.jwks_urls.len(),
ahash::RandomState::new(),
);
// TODO(conrad): run concurrently
// TODO(conrad): strip the JWKs urls (should be checked by cplane as well - cloud#16284)
for url in rules.jwks_urls {
let req = client.get(url.clone());
// TODO(conrad): eventually switch to using reqwest_middleware/`new_client_with_timeout`.
match req.send().await.and_then(|r| r.error_for_status()) {
// todo: should we re-insert JWKs if we want to keep this JWKs URL?
// I expect these failures would be quite sparse.
Err(e) => tracing::warn!(?url, error=?e, "could not fetch JWKs"),
Ok(r) => {
let resp: http::Response<reqwest::Body> = r.into();
match parse_json_body_with_limit::<jose_jwk::JwkSet>(
resp.into_body(),
MAX_JWK_BODY_SIZE,
)
.await
{
Err(e) => tracing::warn!(?url, error=?e, "could not decode JWKs"),
Ok(jwks) => {
key_sets.insert(url, jwks);
}
}
}
}
}
let entry = Arc::new(JwkCacheEntry {
last_retrieved: now,
key_sets,
});
self.cached.swap(Some(Arc::clone(&entry)));
Ok(entry)
}
async fn get_or_update_jwk_cache<F: FetchAuthRules>(
self: &Arc<Self>,
client: &reqwest::Client,
fetch: &F,
) -> Result<Arc<JwkCacheEntry>, anyhow::Error> {
let now = Instant::now();
let guard = self.cached.load_full();
// if we have no cached JWKs, try and get some
let Some(cached) = guard else {
let permit = self.acquire_permit().await;
return self.renew_jwks(permit, client, fetch).await;
};
let last_update = now.duration_since(cached.last_retrieved);
// check if the cached JWKs need updating.
if last_update > MAX_RENEW {
let permit = self.acquire_permit().await;
// it's been too long since we checked the keys. wait for them to update.
return self.renew_jwks(permit, client, fetch).await;
}
// every 5 minutes we should spawn a job to eagerly update the token.
if last_update > AUTO_RENEW {
if let Some(permit) = self.try_acquire_permit() {
tracing::debug!("JWKs should be renewed. Renewal permit acquired");
let permit = permit.into_owned();
let entry = self.clone();
let client = client.clone();
let fetch = fetch.clone();
tokio::spawn(async move {
if let Err(e) = entry.renew_jwks(permit, &client, &fetch).await {
tracing::warn!(error=?e, "could not fetch JWKs in background job");
}
});
} else {
tracing::debug!("JWKs should be renewed. Renewal permit already taken, skipping");
}
}
Ok(cached)
}
async fn check_jwt<F: FetchAuthRules>(
self: &Arc<Self>,
jwt: String,
client: &reqwest::Client,
fetch: &F,
) -> Result<(), anyhow::Error> {
// JWT compact form is defined to be
// <B64(Header)> || . || <B64(Payload)> || . || <B64(Signature)>
// where Signature = alg(<B64(Header)> || . || <B64(Payload)>);
let (header_payload, signature) = jwt
.rsplit_once(".")
.context("Provided authentication token is not a valid JWT encoding")?;
let (header, _payload) = header_payload
.split_once(".")
.context("Provided authentication token is not a valid JWT encoding")?;
let header = base64::decode_config(header, base64::URL_SAFE_NO_PAD)
.context("Provided authentication token is not a valid JWT encoding")?;
let header = serde_json::from_slice::<JWTHeader>(&header)
.context("Provided authentication token is not a valid JWT encoding")?;
let sig = base64::decode_config(signature, base64::URL_SAFE_NO_PAD)
.context("Provided authentication token is not a valid JWT encoding")?;
ensure!(header.typ == "JWT");
let kid = header.kid.context("missing key id")?;
let mut guard = self.get_or_update_jwk_cache(client, fetch).await?;
// get the key from the JWKs if possible. If not, wait for the keys to update.
let jwk = loop {
let jwk = guard
.key_sets
.values()
.flat_map(|jwks| &jwks.keys)
.find(|jwk| jwk.prm.kid.as_deref() == Some(kid));
match jwk {
Some(jwk) => break jwk,
None if guard.last_retrieved.elapsed() > MIN_RENEW => {
let permit = self.acquire_permit().await;
guard = self.renew_jwks(permit, client, fetch).await?;
}
_ => {
bail!("jwk not found");
}
}
};
ensure!(
jwk.is_supported(&header.alg),
"signature algorithm not supported"
);
match &jwk.key {
jose_jwk::Key::Ec(key) => {
verify_ec_signature(header_payload.as_bytes(), &sig, key)?;
}
jose_jwk::Key::Rsa(key) => {
verify_rsa_signature(header_payload.as_bytes(), &sig, key, &jwk.prm.alg)?;
}
key => bail!("unsupported key type {key:?}"),
};
// TODO(conrad): verify iss, exp, nbf, etc...
Ok(())
}
}
impl JwkCache {
pub async fn check_jwt(
&self,
endpoint: EndpointIdInt,
jwt: String,
) -> Result<(), anyhow::Error> {
// try with just a read lock first
let entry = self.map.get(&endpoint).as_deref().map(Arc::clone);
let entry = match entry {
Some(entry) => entry,
None => {
// acquire a write lock after to insert.
let entry = self.map.entry(endpoint).or_default();
Arc::clone(&*entry)
}
};
let fetch = FetchAuthRulesFromCplane { endpoint };
entry.check_jwt(jwt, &self.client, &fetch).await
}
}
fn verify_ec_signature(data: &[u8], sig: &[u8], key: &jose_jwk::Ec) -> anyhow::Result<()> {
use ecdsa::Signature;
use signature::Verifier;
match key.crv {
jose_jwk::EcCurves::P256 => {
let pk =
p256::PublicKey::try_from(key).map_err(|_| anyhow::anyhow!("invalid P256 key"))?;
let key = p256::ecdsa::VerifyingKey::from(&pk);
let sig = Signature::from_slice(sig)?;
key.verify(data, &sig)?;
}
key => bail!("unsupported ec key type {key:?}"),
}
Ok(())
}
fn verify_rsa_signature(
data: &[u8],
sig: &[u8],
key: &jose_jwk::Rsa,
alg: &Option<jose_jwa::Algorithm>,
) -> anyhow::Result<()> {
use jose_jwa::{Algorithm, Signing};
use rsa::{
pkcs1v15::{Signature, VerifyingKey},
RsaPublicKey,
};
let key = RsaPublicKey::try_from(key).map_err(|_| anyhow::anyhow!("invalid RSA key"))?;
match alg {
Some(Algorithm::Signing(Signing::Rs256)) => {
let key = VerifyingKey::<sha2::Sha256>::new(key);
let sig = Signature::try_from(sig)?;
key.verify(data, &sig)?;
}
_ => bail!("invalid RSA signing algorithm"),
};
Ok(())
}
/// <https://datatracker.ietf.org/doc/html/rfc7515#section-4.1>
#[derive(serde::Deserialize, serde::Serialize)]
struct JWTHeader<'a> {
/// must be "JWT"
typ: &'a str,
/// must be a supported alg
alg: jose_jwa::Algorithm,
/// key id, must be provided for our usecase
kid: Option<&'a str>,
}
struct JwkRenewalPermit<'a> {
inner: Option<JwkRenewalPermitInner<'a>>,
}
enum JwkRenewalPermitInner<'a> {
Owned(Arc<JwkCacheEntryLock>),
Borrowed(&'a Arc<JwkCacheEntryLock>),
}
impl JwkRenewalPermit<'_> {
fn into_owned(mut self) -> JwkRenewalPermit<'static> {
JwkRenewalPermit {
inner: self.inner.take().map(JwkRenewalPermitInner::into_owned),
}
}
async fn acquire_permit(from: &Arc<JwkCacheEntryLock>) -> JwkRenewalPermit {
match from.lookup.acquire().await {
Ok(permit) => {
permit.forget();
JwkRenewalPermit {
inner: Some(JwkRenewalPermitInner::Borrowed(from)),
}
}
Err(_) => panic!("semaphore should not be closed"),
}
}
fn try_acquire_permit(from: &Arc<JwkCacheEntryLock>) -> Option<JwkRenewalPermit> {
match from.lookup.try_acquire() {
Ok(permit) => {
permit.forget();
Some(JwkRenewalPermit {
inner: Some(JwkRenewalPermitInner::Borrowed(from)),
})
}
Err(tokio::sync::TryAcquireError::NoPermits) => None,
Err(tokio::sync::TryAcquireError::Closed) => panic!("semaphore should not be closed"),
}
}
}
impl JwkRenewalPermitInner<'_> {
fn into_owned(self) -> JwkRenewalPermitInner<'static> {
match self {
JwkRenewalPermitInner::Owned(p) => JwkRenewalPermitInner::Owned(p),
JwkRenewalPermitInner::Borrowed(p) => JwkRenewalPermitInner::Owned(Arc::clone(p)),
}
}
}
impl Drop for JwkRenewalPermit<'_> {
fn drop(&mut self) {
let entry = match &self.inner {
None => return,
Some(JwkRenewalPermitInner::Owned(p)) => p,
Some(JwkRenewalPermitInner::Borrowed(p)) => *p,
};
entry.lookup.add_permits(1);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{future::IntoFuture, net::SocketAddr, time::SystemTime};
use base64::URL_SAFE_NO_PAD;
use bytes::Bytes;
use http::Response;
use http_body_util::Full;
use hyper1::service::service_fn;
use hyper_util::rt::TokioIo;
use rand::rngs::OsRng;
use signature::Signer;
use tokio::net::TcpListener;
fn new_ec_jwk(kid: String) -> (p256::SecretKey, jose_jwk::Jwk) {
let sk = p256::SecretKey::random(&mut OsRng);
let pk = sk.public_key().into();
let jwk = jose_jwk::Jwk {
key: jose_jwk::Key::Ec(pk),
prm: jose_jwk::Parameters {
kid: Some(kid),
alg: Some(jose_jwa::Algorithm::Signing(jose_jwa::Signing::Es256)),
..Default::default()
},
};
(sk, jwk)
}
fn new_rsa_jwk(kid: String) -> (rsa::RsaPrivateKey, jose_jwk::Jwk) {
let sk = rsa::RsaPrivateKey::new(&mut OsRng, 2048).unwrap();
let pk = sk.to_public_key().into();
let jwk = jose_jwk::Jwk {
key: jose_jwk::Key::Rsa(pk),
prm: jose_jwk::Parameters {
kid: Some(kid),
alg: Some(jose_jwa::Algorithm::Signing(jose_jwa::Signing::Rs256)),
..Default::default()
},
};
(sk, jwk)
}
fn build_jwt_payload(kid: String, sig: jose_jwa::Signing) -> String {
let header = JWTHeader {
typ: "JWT",
alg: jose_jwa::Algorithm::Signing(sig),
kid: Some(&kid),
};
let body = typed_json::json! {{
"exp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() + 3600,
}};
let header =
base64::encode_config(serde_json::to_string(&header).unwrap(), URL_SAFE_NO_PAD);
let body = base64::encode_config(body.to_string(), URL_SAFE_NO_PAD);
format!("{header}.{body}")
}
fn new_ec_jwt(kid: String, key: p256::SecretKey) -> String {
use p256::ecdsa::{Signature, SigningKey};
let payload = build_jwt_payload(kid, jose_jwa::Signing::Es256);
let sig: Signature = SigningKey::from(key).sign(payload.as_bytes());
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
format!("{payload}.{sig}")
}
fn new_rsa_jwt(kid: String, key: rsa::RsaPrivateKey) -> String {
use rsa::pkcs1v15::SigningKey;
use rsa::signature::SignatureEncoding;
let payload = build_jwt_payload(kid, jose_jwa::Signing::Rs256);
let sig = SigningKey::<sha2::Sha256>::new(key).sign(payload.as_bytes());
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
format!("{payload}.{sig}")
}
#[tokio::test]
async fn renew() {
let (rs1, jwk1) = new_rsa_jwk("1".into());
let (rs2, jwk2) = new_rsa_jwk("2".into());
let (ec1, jwk3) = new_ec_jwk("3".into());
let (ec2, jwk4) = new_ec_jwk("4".into());
let jwt1 = new_rsa_jwt("1".into(), rs1);
let jwt2 = new_rsa_jwt("2".into(), rs2);
let jwt3 = new_ec_jwt("3".into(), ec1);
let jwt4 = new_ec_jwt("4".into(), ec2);
let foo_jwks = jose_jwk::JwkSet {
keys: vec![jwk1, jwk3],
};
let bar_jwks = jose_jwk::JwkSet {
keys: vec![jwk2, jwk4],
};
let service = service_fn(move |req| {
let foo_jwks = foo_jwks.clone();
let bar_jwks = bar_jwks.clone();
async move {
let jwks = match req.uri().path() {
"/foo" => &foo_jwks,
"/bar" => &bar_jwks,
_ => {
return Response::builder()
.status(404)
.body(Full::new(Bytes::new()));
}
};
let body = serde_json::to_vec(jwks).unwrap();
Response::builder()
.status(200)
.body(Full::new(Bytes::from(body)))
}
});
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
let server = hyper1::server::conn::http1::Builder::new();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
let (s, _) = listener.accept().await.unwrap();
let serve = server.serve_connection(TokioIo::new(s), service.clone());
tokio::spawn(serve.into_future());
}
});
let client = reqwest::Client::new();
#[derive(Clone)]
struct Fetch(SocketAddr);
impl FetchAuthRules for Fetch {
async fn fetch_auth_rules(&self) -> anyhow::Result<AuthRules> {
Ok(AuthRules {
jwks_urls: vec![
format!("http://{}/foo", self.0).parse().unwrap(),
format!("http://{}/bar", self.0).parse().unwrap(),
],
})
}
}
let jwk_cache = Arc::new(JwkCacheEntryLock::default());
jwk_cache
.check_jwt(jwt1, &client, &Fetch(addr))
.await
.unwrap();
jwk_cache
.check_jwt(jwt2, &client, &Fetch(addr))
.await
.unwrap();
jwk_cache
.check_jwt(jwt3, &client, &Fetch(addr))
.await
.unwrap();
jwk_cache
.check_jwt(jwt4, &client, &Fetch(addr))
.await
.unwrap();
}
}

View File

@@ -151,21 +151,34 @@ impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
#[derive(Clone)]
pub struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: CancelToken,
cancel_token: Option<CancelToken>,
}
impl CancelClosure {
pub fn new(socket_addr: SocketAddr, cancel_token: CancelToken) -> Self {
Self {
socket_addr,
cancel_token,
cancel_token: Some(cancel_token),
}
}
#[cfg(test)]
pub fn test() -> Self {
use std::net::{Ipv4Addr, SocketAddrV4};
Self {
socket_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from_bits(0), 0)),
cancel_token: None,
}
}
/// Cancels the query running on user's compute node.
pub async fn try_cancel_query(self) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
self.cancel_token.cancel_query_raw(socket, NoTls).await?;
info!("query was cancelled");
if let Some(cancel_token) = self.cancel_token {
let socket = TcpStream::connect(self.socket_addr).await?;
cancel_token.cancel_query_raw(socket, NoTls).await?;
info!("query was cancelled");
}
Ok(())
}
}

View File

@@ -16,8 +16,10 @@ use rustls::{client::danger::ServerCertVerifier, pki_types::InvalidDnsNameError}
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres_rustls::MakeRustlsConnect;
use tokio_postgres::{
tls::{MakeTlsConnect, NoTlsError},
Client, Connection,
};
use tracing::{error, info, warn};
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
@@ -42,6 +44,12 @@ pub enum ConnectionError {
TooManyConnectionAttempts(#[from] ApiLockError),
}
impl From<NoTlsError> for ConnectionError {
fn from(value: NoTlsError) -> Self {
Self::CouldNotConnect(io::Error::new(io::ErrorKind::Other, value.to_string()))
}
}
impl UserFacingError for ConnectionError {
fn to_string_client(&self) -> String {
use ConnectionError::*;
@@ -273,6 +281,30 @@ pub struct PostgresConnection {
}
impl ConnCfg {
/// Connect to a corresponding compute node.
pub async fn managed_connect<M: MakeTlsConnect<tokio::net::TcpStream>>(
&self,
ctx: &RequestMonitoring,
timeout: Duration,
mktls: &mut M,
) -> Result<(SocketAddr, Client, Connection<TcpStream, M::Stream>), ConnectionError>
where
ConnectionError: From<M::Error>,
{
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
drop(pause);
let tls = mktls.make_tls_connect(host)?;
// connect_raw() will not use TLS if sslmode is "disable"
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (client, connection) = self.0.connect_raw(stream, tls).await?;
drop(pause);
Ok((socket_addr, client, connection))
}
/// Connect to a corresponding compute node.
pub async fn connect(
&self,
@@ -281,10 +313,6 @@ impl ConnCfg {
aux: MetricsAuxInfo,
timeout: Duration,
) -> Result<PostgresConnection, ConnectionError> {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
drop(pause);
let client_config = if allow_self_signed_compute {
// Allow all certificates for creating the connection
let verifier = Arc::new(AcceptEverythingVerifier) as Arc<dyn ServerCertVerifier>;
@@ -298,21 +326,15 @@ impl ConnCfg {
let client_config = client_config.with_no_client_auth();
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
host,
)?;
// connect_raw() will not use TLS if sslmode is "disable"
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (client, connection) = self.0.connect_raw(stream, tls).await?;
drop(pause);
let (socket_addr, client, connection) =
self.managed_connect(ctx, timeout, &mut mk_tls).await?;
tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
let stream = connection.stream.into_inner();
info!(
cold_start_info = ctx.cold_start_info().as_str(),
"connected to compute node at {host} ({socket_addr}) sslmode={:?}",
"connected to compute node ({socket_addr}) sslmode={:?}",
self.0.get_ssl_mode()
);

View File

@@ -6,6 +6,12 @@ pub mod health_server;
use std::time::Duration;
use anyhow::bail;
use bytes::Bytes;
use http_body_util::BodyExt;
use hyper1::body::Body;
use serde::de::DeserializeOwned;
pub use reqwest::{Request, Response, StatusCode};
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
@@ -96,6 +102,33 @@ impl Endpoint {
}
}
pub async fn parse_json_body_with_limit<D: DeserializeOwned>(
mut b: impl Body<Data = Bytes, Error = reqwest::Error> + Unpin,
limit: usize,
) -> anyhow::Result<D> {
// We could use `b.limited().collect().await.to_bytes()` here
// but this ends up being slightly more efficient as far as I can tell.
// check the lower bound of the size hint.
// in reqwest, this value is influenced by the Content-Length header.
let lower_bound = match usize::try_from(b.size_hint().lower()) {
Ok(bound) if bound <= limit => bound,
_ => bail!("Content length exceeds limit of {limit} bytes"),
};
let mut bytes = Vec::with_capacity(lower_bound);
while let Some(frame) = b.frame().await.transpose()? {
if let Ok(data) = frame.into_data() {
if bytes.len() + data.len() > limit {
bail!("Content length exceeds limit of {limit} bytes")
}
bytes.extend_from_slice(&data);
}
}
Ok(serde_json::from_slice::<D>(&bytes)?)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -5,7 +5,8 @@ use tracing::{field::display, info};
use crate::{
auth::{backend::ComputeCredentials, check_peer_addr_is_in_list, AuthError},
compute,
cancellation::CancelClosure,
compute::{self, ConnectionError},
config::{AuthenticationConfig, ProxyConfig},
console::{
errors::{GetAuthInfoError, WakeComputeError},
@@ -142,7 +143,7 @@ pub enum HttpConnError {
#[error("pooled connection closed at inconsistent state")]
ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
#[error("could not connection to compute")]
ConnectionError(#[from] tokio_postgres::Error),
ConnectionError(#[from] ConnectionError),
#[error("could not get auth info")]
GetAuthInfo(#[from] GetAuthInfoError),
@@ -229,17 +230,16 @@ impl ConnectMechanism for TokioMechanism {
let host = node_info.config.get_host()?;
let permit = self.locks.get_permit(&host).await?;
let mut config = (*node_info.config).clone();
let config = config
.user(&self.conn_info.user_info.user)
.password(&*self.conn_info.password)
.dbname(&self.conn_info.dbname)
.connect_timeout(timeout);
let (socket_addr, client, connection) = permit.release_result(
node_info
.config
.managed_connect(ctx, timeout, &mut tokio_postgres::NoTls)
.await,
)?;
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(tokio_postgres::NoTls).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
Ok(poll_client(
@@ -250,8 +250,14 @@ impl ConnectMechanism for TokioMechanism {
connection,
self.conn_id,
node_info.aux.clone(),
cancel_closure,
))
}
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
config
.user(&self.conn_info.user_info.user)
.dbname(&self.conn_info.dbname)
.password(&self.conn_info.password);
}
}

View File

@@ -12,11 +12,13 @@ use std::{
ops::Deref,
sync::atomic::{self, AtomicUsize},
};
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};
use tokio_util::sync::CancellationToken;
use crate::cancellation::CancelClosure;
use crate::console::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
@@ -463,14 +465,16 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
}
}
#[allow(clippy::too_many_arguments)]
pub fn poll_client<C: ClientInnerExt>(
global_pool: Arc<GlobalConnPool<C>>,
ctx: &RequestMonitoring,
conn_info: ConnInfo,
client: C,
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
mut connection: tokio_postgres::Connection<TcpStream, NoTlsStream>,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
cancel_closure: CancelClosure,
) -> Client<C> {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let mut session_id = ctx.session_id();
@@ -572,6 +576,7 @@ pub fn poll_client<C: ClientInnerExt>(
cancel,
aux,
conn_id,
cancel_closure,
};
Client::new(inner, conn_info, pool_clone)
}
@@ -582,6 +587,7 @@ struct ClientInner<C: ClientInnerExt> {
cancel: CancellationToken,
aux: MetricsAuxInfo,
conn_id: uuid::Uuid,
cancel_closure: CancelClosure,
}
impl<C: ClientInnerExt> Drop for ClientInner<C> {
@@ -646,7 +652,7 @@ impl<C: ClientInnerExt> Client<C> {
pool,
}
}
pub fn inner(&mut self) -> (&mut C, Discard<'_, C>) {
pub fn inner(&mut self) -> (&mut C, &CancelClosure, Discard<'_, C>) {
let Self {
inner,
pool,
@@ -654,7 +660,11 @@ impl<C: ClientInnerExt> Client<C> {
span: _,
} = self;
let inner = inner.as_mut().expect("client inner should not be removed");
(&mut inner.inner, Discard { pool, conn_info })
(
&mut inner.inner,
&inner.cancel_closure,
Discard { pool, conn_info },
)
}
}
@@ -751,6 +761,7 @@ mod tests {
cold_start_info: crate::console::messages::ColdStartInfo::Warm,
},
conn_id: uuid::Uuid::new_v4(),
cancel_closure: CancelClosure::test(),
}
}
@@ -785,7 +796,7 @@ mod tests {
{
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
assert_eq!(0, pool.get_global_connections_count());
client.inner().1.discard();
client.inner().2.discard();
// Discard should not add the connection from the pool.
assert_eq!(0, pool.get_global_connections_count());
}

View File

@@ -26,7 +26,6 @@ use tokio_postgres::error::ErrorPosition;
use tokio_postgres::error::SqlState;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::NoTls;
use tokio_postgres::ReadyForQueryStatus;
use tokio_postgres::Transaction;
use tokio_util::sync::CancellationToken;
@@ -261,7 +260,9 @@ pub async fn handle(
let mut message = e.to_string_client();
let db_error = match &e {
SqlOverHttpError::ConnectCompute(HttpConnError::ConnectionError(e))
SqlOverHttpError::ConnectCompute(HttpConnError::ConnectionError(
crate::compute::ConnectionError::Postgres(e),
))
| SqlOverHttpError::Postgres(e) => e.as_db_error(),
_ => None,
};
@@ -622,8 +623,7 @@ impl QueryData {
client: &mut Client<tokio_postgres::Client>,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
let (inner, cancel_token, mut discard) = client.inner();
let res = match select(
pin!(query_to_json(&*inner, self, &mut 0, parsed_headers)),
@@ -647,7 +647,7 @@ impl QueryData {
// The query was cancelled.
Either::Right((_cancelled, query)) => {
tracing::info!("cancelling query");
if let Err(err) = cancel_token.cancel_query(NoTls).await {
if let Err(err) = cancel_token.clone().try_cancel_query().await {
tracing::error!(?err, "could not cancel query");
}
// wait for the query cancellation
@@ -663,7 +663,9 @@ impl QueryData {
// query failed or was cancelled.
Ok(Err(error)) => {
let db_error = match &error {
SqlOverHttpError::ConnectCompute(HttpConnError::ConnectionError(e))
SqlOverHttpError::ConnectCompute(HttpConnError::ConnectionError(
crate::compute::ConnectionError::Postgres(e),
))
| SqlOverHttpError::Postgres(e) => e.as_db_error(),
_ => None,
};
@@ -694,8 +696,7 @@ impl BatchQueryData {
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
info!("starting transaction");
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
let (inner, cancel_token, mut discard) = client.inner();
let mut builder = inner.build_transaction();
if let Some(isolation_level) = parsed_headers.txn_isolation_level {
builder = builder.isolation_level(isolation_level);
@@ -728,7 +729,7 @@ impl BatchQueryData {
json_output
}
Err(SqlOverHttpError::Cancelled(_)) => {
if let Err(err) = cancel_token.cancel_query(NoTls).await {
if let Err(err) = cancel_token.clone().try_cancel_query().await {
tracing::error!(?err, "could not cancel query");
}
// TODO: after cancelling, wait to see if we can get a status. maybe the connection is still safe.

View File

@@ -164,6 +164,30 @@ impl Deref for FileStorage {
}
}
impl TimelinePersistentState {
pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
if self.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(self);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
self.ser_into(&mut buf)?;
}
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
Ok(buf)
}
}
#[async_trait::async_trait]
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
@@ -180,24 +204,8 @@ impl Storage for FileStorage {
&control_partial_path
)
})?;
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
if s.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(s);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
}
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
let buf: Vec<u8> = s.write_to_buf()?;
control_partial.write_all(&buf).await.with_context(|| {
format!(

View File

@@ -10,7 +10,7 @@
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
@@ -97,10 +97,11 @@ impl Client {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
stream_to: NodeId,
) -> Result<reqwest::Response> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/snapshot",
self.mgmt_api_endpoint, tenant_id, timeline_id
"{}/v1/tenant/{}/timeline/{}/snapshot/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id, stream_to.0
);
self.get(&uri).await
}

View File

@@ -205,6 +205,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
/// Stream tar archive with all timeline data.
async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let destination = parse_request_param(&request, "destination_id")?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
@@ -225,7 +226,13 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
// so create the chan and write to it in another task.
let (tx, rx) = mpsc::channel(1);
task::spawn(pull_timeline::stream_snapshot(tli, tx));
let conf = get_conf(&request);
task::spawn(pull_timeline::stream_snapshot(
tli,
conf.my_id,
destination,
tx,
));
let rx_stream = ReceiverStream::new(rx);
let body = Body::wrap_stream(rx_stream);
@@ -565,7 +572,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
request_span(r, tenant_delete_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot",
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post("/v1/pull_timeline", |r| {

View File

@@ -11,13 +11,8 @@ use std::{
io::{self, ErrorKind},
sync::Arc,
};
use tokio::{
fs::{File, OpenOptions},
io::AsyncWrite,
sync::mpsc,
task,
};
use tokio_tar::{Archive, Builder};
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
use tokio_tar::{Archive, Builder, Header};
use tokio_util::{
io::{CopyToBytes, SinkWriter},
sync::PollSender,
@@ -32,13 +27,15 @@ use crate::{
routes::TimelineStatus,
},
safekeeper::Term,
state::TimelinePersistentState,
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
wal_backup,
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
};
use utils::{
crashsafe::{durable_rename, fsync_async_opt},
id::{TenantId, TenantTimelineId, TimelineId},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
logging::SecretString,
lsn::Lsn,
pausable_failpoint,
@@ -46,8 +43,13 @@ use utils::{
/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender<Result<Bytes>>) {
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
pub async fn stream_snapshot(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) {
if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
// prematurely terminated. It would be nice to try to send the error in
@@ -81,6 +83,8 @@ impl Drop for SnapshotContext {
pub async fn stream_snapshot_guts(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
@@ -104,7 +108,7 @@ pub async fn stream_snapshot_guts(
// which is also likely suboptimal.
let mut ar = Builder::new_non_terminated(pinned_writer);
let bctx = tli.start_snapshot(&mut ar).await?;
let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
pausable_failpoint!("sk-snapshot-after-list-pausable");
let tli_dir = tli.get_timeline_dir();
@@ -158,13 +162,43 @@ impl WalResidentTimeline {
async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
&self,
ar: &mut tokio_tar::Builder<W>,
source: NodeId,
destination: NodeId,
) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await;
let wal_seg_size = shared_state.get_wal_seg_size();
let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
let mut cf = File::open(cf_path).await?;
ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
// Modify the partial segment of the in-memory copy for the control file to
// point to the destination safekeeper.
let replace = control_store
.partial_backup
.replace_uploaded_segment(source, destination)?;
if let Some(replace) = replace {
// The deserialized control file has an uploaded partial. We upload a copy
// of it to object storage for the destination safekeeper and send an updated
// control file in the snapshot.
tracing::info!(
"Replacing uploaded partial segment in in-mem control file: {replace:?}"
);
let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?;
wal_backup::copy_partial_segment(
&replace.previous.remote_path(&remote_timeline_path),
&replace.current.remote_path(&remote_timeline_path),
)
.await?;
}
let buf = control_store
.write_to_buf()
.with_context(|| "failed to serialize control store")?;
let mut header = Header::new_gnu();
header.set_size(buf.len().try_into().expect("never breaches u64"));
ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
.await
.with_context(|| "failed to append to archive")?;
// We need to stream since the oldest segment someone (s3 or pageserver)
// still needs. This duplicates calc_horizon_lsn logic.
@@ -342,7 +376,7 @@ async fn pull_timeline(
let client = Client::new(host.clone(), sk_auth_token.clone());
// Request stream with basebackup archive.
let bb_resp = client
.snapshot(status.tenant_id, status.timeline_id)
.snapshot(status.tenant_id, status.timeline_id, conf.my_id)
.await?;
// Make Stream of Bytes from it...

View File

@@ -483,6 +483,16 @@ pub(crate) async fn backup_partial_segment(
.await
}
pub(crate) async fn copy_partial_segment(
source: &RemotePath,
destination: &RemotePath,
) -> Result<()> {
let storage = get_configured_remote_storage();
let cancel = CancellationToken::new();
storage.copy_object(source, destination, &cancel).await
}
pub async fn read_object(
file_path: &RemotePath,
offset: u64,

View File

@@ -17,14 +17,13 @@
//! file. Code updates state in the control file before doing any S3 operations.
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.
use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info, instrument, warn};
use utils::lsn::Lsn;
use utils::{id::NodeId, lsn::Lsn};
use crate::{
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
@@ -82,6 +81,12 @@ pub struct State {
pub segments: Vec<PartialRemoteSegment>,
}
#[derive(Debug)]
pub(crate) struct ReplaceUploadedSegment {
pub(crate) previous: PartialRemoteSegment,
pub(crate) current: PartialRemoteSegment,
}
impl State {
/// Find an Uploaded segment. There should be only one Uploaded segment at a time.
pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
@@ -90,6 +95,54 @@ impl State {
.find(|seg| seg.status == UploadStatus::Uploaded)
.cloned()
}
/// Replace the name of the Uploaded segment (if one exists) in order to match
/// it with `destination` safekeeper. Returns a description of the change or None
/// wrapped in anyhow::Result.
pub(crate) fn replace_uploaded_segment(
&mut self,
source: NodeId,
destination: NodeId,
) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
let current = self
.segments
.iter_mut()
.find(|seg| seg.status == UploadStatus::Uploaded);
let current = match current {
Some(some) => some,
None => {
return anyhow::Ok(None);
}
};
// Sanity check that the partial segment we are replacing is belongs
// to the `source` SK.
if !current
.name
.ends_with(format!("sk{}.partial", source.0).as_str())
{
anyhow::bail!(
"Partial segment name ({}) doesn't match self node id ({})",
current.name,
source
);
}
let previous = current.clone();
let new_name = current.name.replace(
format!("_sk{}", source.0).as_str(),
format!("_sk{}", destination.0).as_str(),
);
current.name = new_name;
anyhow::Ok(Some(ReplaceUploadedSegment {
previous,
current: current.clone(),
}))
}
}
struct PartialBackup {

View File

@@ -500,7 +500,7 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
StatusCode::OK,
state
.service
.node_configure(
.external_node_configure(
config_req.node_id,
config_req.availability.map(NodeAvailability::from),
config_req.scheduling,

View File

@@ -196,14 +196,26 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
}
fn main() -> anyhow::Result<()> {
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
std::process::exit(1);
}));
logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
// log using tracing so we don't get confused output by default hook writing to stderr
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
let hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
// let sentry send a message (and flush)
// and trace the error
hook(info);
std::process::exit(1);
}));
tokio::runtime::Builder::new_current_thread()
// We use spawn_blocking for database operations, so require approximately
// as many blocking threads as we will open database connections.
@@ -217,12 +229,6 @@ fn main() -> anyhow::Result<()> {
async fn async_main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
preinitialize_metrics();
let args = Cli::parse();

View File

@@ -3006,8 +3006,13 @@ impl Service {
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
}
Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, msg) => {
// avoid turning these into conflicts to remain compatible with
// pageservers, 500 errors are sadly retryable with timeline ancestor
// detach
ApiError::InternalServerError(anyhow::anyhow!("{node}: {msg}"))
}
// rest can be mapped as usual
// FIXME: this converts some 500 to 409 which is not per openapi
other => passthrough_api_error(&node, other),
}
})
@@ -3041,6 +3046,8 @@ impl Service {
?mismatching,
"shards returned different results"
);
return Err(ApiError::InternalServerError(anyhow::anyhow!("pageservers returned mixed results for ancestor detach; manual intervention is required.")));
}
Ok(any.1)
@@ -4905,6 +4912,26 @@ impl Service {
Ok(())
}
/// Wrapper around [`Self::node_configure`] which only allows changes while there is no ongoing
/// operation for HTTP api.
pub(crate) async fn external_node_configure(
&self,
node_id: NodeId,
availability: Option<NodeAvailability>,
scheduling: Option<NodeSchedulingPolicy>,
) -> Result<(), ApiError> {
{
let locked = self.inner.read().unwrap();
if let Some(op) = locked.ongoing_operation.as_ref().map(|op| op.operation) {
return Err(ApiError::PreconditionFailed(
format!("Ongoing background operation forbids configuring: {op}").into(),
));
}
}
self.node_configure(node_id, availability, scheduling).await
}
pub(crate) async fn start_node_drain(
self: &Arc<Self>,
node_id: NodeId,
@@ -4962,6 +4989,8 @@ impl Service {
cancel: cancel.clone(),
});
let span = tracing::info_span!(parent: None, "drain_node", %node_id);
tokio::task::spawn({
let service = self.clone();
let cancel = cancel.clone();
@@ -4978,21 +5007,21 @@ impl Service {
}
}
tracing::info!(%node_id, "Drain background operation starting");
tracing::info!("Drain background operation starting");
let res = service.drain_node(node_id, cancel).await;
match res {
Ok(()) => {
tracing::info!(%node_id, "Drain background operation completed successfully");
tracing::info!("Drain background operation completed successfully");
}
Err(OperationError::Cancelled) => {
tracing::info!(%node_id, "Drain background operation was cancelled");
tracing::info!("Drain background operation was cancelled");
}
Err(err) => {
tracing::error!(%node_id, "Drain background operation encountered: {err}")
tracing::error!("Drain background operation encountered: {err}")
}
}
}
});
}.instrument(span));
}
NodeSchedulingPolicy::Draining => {
return Err(ApiError::Conflict(format!(
@@ -5010,14 +5039,14 @@ impl Service {
}
pub(crate) async fn cancel_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
let (node_available, node_policy) = {
let node_available = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
(node.is_available(), node.get_scheduling())
node.is_available()
};
if !node_available {
@@ -5026,12 +5055,6 @@ impl Service {
));
}
if !matches!(node_policy, NodeSchedulingPolicy::Draining) {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} has no drain in progress").into(),
));
}
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
if let Operation::Drain(drain) = op_handler.operation {
if drain.node_id == node_id {
@@ -5097,6 +5120,8 @@ impl Service {
cancel: cancel.clone(),
});
let span = tracing::info_span!(parent: None, "fill_node", %node_id);
tokio::task::spawn({
let service = self.clone();
let cancel = cancel.clone();
@@ -5113,21 +5138,21 @@ impl Service {
}
}
tracing::info!(%node_id, "Fill background operation starting");
tracing::info!("Fill background operation starting");
let res = service.fill_node(node_id, cancel).await;
match res {
Ok(()) => {
tracing::info!(%node_id, "Fill background operation completed successfully");
tracing::info!("Fill background operation completed successfully");
}
Err(OperationError::Cancelled) => {
tracing::info!(%node_id, "Fill background operation was cancelled");
tracing::info!("Fill background operation was cancelled");
}
Err(err) => {
tracing::error!(%node_id, "Fill background operation encountered: {err}")
tracing::error!("Fill background operation encountered: {err}")
}
}
}
});
}.instrument(span));
}
NodeSchedulingPolicy::Filling => {
return Err(ApiError::Conflict(format!(
@@ -5145,14 +5170,14 @@ impl Service {
}
pub(crate) async fn cancel_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
let (node_available, node_policy) = {
let node_available = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
(node.is_available(), node.get_scheduling())
node.is_available()
};
if !node_available {
@@ -5161,12 +5186,6 @@ impl Service {
));
}
if !matches!(node_policy, NodeSchedulingPolicy::Filling) {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} has no fill in progress").into(),
));
}
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
if let Operation::Fill(fill) = op_handler.operation {
if fill.node_id == node_id {
@@ -5975,7 +5994,7 @@ impl Service {
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
.await;
failpoint_support::sleep_millis_async!("sleepy-drain-loop");
failpoint_support::sleep_millis_async!("sleepy-drain-loop", &cancel);
}
while !waiters.is_empty() {

View File

@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
@@ -117,7 +117,7 @@ use refs::AncestorRefs;
// - Are there any refs to ancestor shards' layers?
#[derive(Default)]
struct TenantRefAccumulator {
shards_seen: HashMap<TenantId, Vec<ShardIndex>>,
shards_seen: HashMap<TenantId, BTreeSet<ShardIndex>>,
// For each shard that has refs to an ancestor's layers, the set of ancestor layers referred to
ancestor_ref_shards: AncestorRefs,
@@ -130,7 +130,7 @@ impl TenantRefAccumulator {
.shards_seen
.entry(ttid.tenant_shard_id.tenant_id)
.or_default())
.push(this_shard_idx);
.insert(this_shard_idx);
let mut ancestor_refs = Vec::new();
for (layer_name, layer_metadata) in &index_part.layer_metadata {
@@ -154,7 +154,7 @@ impl TenantRefAccumulator {
summary: &mut GcSummary,
) -> (Vec<TenantShardId>, AncestorRefs) {
let mut ancestors_to_gc = Vec::new();
for (tenant_id, mut shard_indices) in self.shards_seen {
for (tenant_id, shard_indices) in self.shards_seen {
// Find the highest shard count
let latest_count = shard_indices
.iter()
@@ -162,6 +162,7 @@ impl TenantRefAccumulator {
.max()
.expect("Always at least one shard");
let mut shard_indices = shard_indices.iter().collect::<Vec<_>>();
let (mut latest_shards, ancestor_shards) = {
let at =
itertools::partition(&mut shard_indices, |i| i.shard_count == latest_count);
@@ -174,7 +175,7 @@ impl TenantRefAccumulator {
// to scan the S3 bucket halfway through a shard split.
if latest_shards.len() != latest_count.count() as usize {
// This should be extremely rare, so we warn on it.
tracing::warn!(%tenant_id, "Missed some shards at count {:?}", latest_count);
tracing::warn!(%tenant_id, "Missed some shards at count {:?}: {latest_shards:?}", latest_count);
continue;
}
@@ -212,7 +213,7 @@ impl TenantRefAccumulator {
.iter()
.map(|s| s.tenant_shard_id.to_index())
.collect();
if controller_indices != latest_shards {
if !controller_indices.iter().eq(latest_shards.iter().copied()) {
tracing::info!(%tenant_id, "Latest shards seen in S3 ({latest_shards:?}) don't match controller state ({controller_indices:?})");
continue;
}

View File

@@ -67,6 +67,7 @@ from fixtures.pageserver.utils import (
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
LocalFsStorage,
MockS3Server,
RemoteStorage,
RemoteStorageKind,
@@ -4425,14 +4426,32 @@ class Safekeeper(LogUtils):
def timeline_dir(self, tenant_id, timeline_id) -> Path:
return self.data_dir / str(tenant_id) / str(timeline_id)
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
tline_path = (
self.env.repo_dir
/ "local_fs_remote_storage"
/ "safekeeper"
/ str(tenant_id)
/ str(timeline_id)
)
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
return self._list_segments_in_dir(
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
)
def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""
Get list of segment names of the given timeline.
"""
tli_dir = self.timeline_dir(tenant_id, timeline_id)
return self._list_segments_in_dir(
tli_dir, lambda name: not name.startswith("safekeeper.control")
)
def _list_segments_in_dir(self, path: Path, keep_filter: Callable[[str], bool]) -> list[str]:
segments = []
for _, _, filenames in os.walk(tli_dir):
segments.extend([f for f in filenames if not f.startswith("safekeeper.control")])
for _, _, filenames in os.walk(path):
segments.extend([f for f in filenames if keep_filter(f)])
segments.sort()
return segments

View File

@@ -262,3 +262,85 @@ def test_publisher_restart(
sub_workload.terminate()
finally:
pub_workload.terminate()
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_snap_files(
pg_bin: PgBin,
benchmark_project_pub: NeonApiEndpoint,
zenbenchmark: NeonBenchmarker,
):
"""
Creates a node with a replication slot. Generates pgbench into the replication slot,
then runs pgbench inserts while generating large numbers of snapfiles. Then restarts
the node and tries to peek the replication changes.
"""
test_duration_min = 60
test_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
env = benchmark_project_pub.pgbench_env
connstr = benchmark_project_pub.connstr
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env)
with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'")
is_super = cur.fetchall()[0]
assert is_super, "This benchmark won't work if we don't have superuser"
conn = psycopg2.connect(connstr)
conn.autocommit = True
cur = conn.cursor()
cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1")
with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT pg_reload_conf()")
with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'slotter'
) THEN
PERFORM pg_drop_replication_slot('slotter');
END IF;
END $$;
"""
)
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
try:
start = time.time()
prev_measurement = time.time()
while time.time() - start < test_duration_min * 60:
with psycopg2.connect(connstr) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute(
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
)
# Measure storage
if time.time() - prev_measurement > test_interval_min * 60:
storage = benchmark_project_pub.get_synthetic_storage_size()
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
prev_measurement = time.time()
time.sleep(test_interval_min * 60 / 3)
finally:
workload.terminate()

View File

@@ -0,0 +1,139 @@
from fixtures.neon_fixtures import NeonEnvBuilder
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
conn = endpoint.connect()
cur = conn.cursor()
n_records = 1000
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute("create table t(id integer, val integer)")
cur.execute("begin")
cur.execute("insert into t values (1, 0)")
cur.execute("insert into t values (2, 0)")
cur.execute(f"insert into t select g, 0 from generate_series(3,{n_records}) g")
# Open a cursor that scroll it halfway through
cur.execute("DECLARE c1 NO SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM t")
cur.execute("fetch 500 from c1")
rows = cur.fetchall()
assert len(rows) == 500
# Perform specified operation
cur.execute(op)
# Clear the cache, so that we exercise reconstructing the pages
# from WAL
cur.execute("SELECT clear_buffer_cache()")
# Check that the cursor opened earlier still works. If the
# combocids are not restored correctly, it won't.
cur.execute("fetch all from c1")
rows = cur.fetchall()
assert len(rows) == 500
cur.execute("rollback")
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "delete from t")
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "update t set val=val+1")
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "select * from t for update")
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
conn = endpoint.connect()
cur = conn.cursor()
n_records = 1000
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute("create table t(id integer, val integer)")
file_path = f"{endpoint.pg_data_dir_path()}/t.csv"
cur.execute(f"insert into t select g, 0 from generate_series(1,{n_records}) g")
cur.execute(f"copy t to '{file_path}'")
cur.execute("truncate table t")
cur.execute("begin")
cur.execute(f"copy t from '{file_path}'")
# Open a cursor that scroll it halfway through
cur.execute("DECLARE c1 NO SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM t")
cur.execute("fetch 500 from c1")
rows = cur.fetchall()
assert len(rows) == 500
# Delete all the rows. Because all of the rows were inserted earlier in the
# same transaction, all the rows will get a combocid.
cur.execute("delete from t")
# Clear the cache, so that we exercise reconstructing the pages
# from WAL
cur.execute("SELECT clear_buffer_cache()")
# Check that the cursor opened earlier still works. If the
# combocids are not restored correctly, it won't.
cur.execute("fetch all from c1")
rows = cur.fetchall()
assert len(rows) == 500
cur.execute("rollback")
def test_combocid(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()
n_records = 100000
cur.execute("create table t(id integer, val integer)")
cur.execute(f"insert into t values (generate_series(1,{n_records}), 0)")
cur.execute("begin")
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("delete from t")
assert cur.rowcount == n_records
cur.execute("delete from t")
assert cur.rowcount == 0
cur.execute(f"insert into t values (generate_series(1,{n_records}), 0)")
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("rollback")

View File

@@ -2091,3 +2091,47 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
)
== 0
)
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
# single unsharded tenant, two locations
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start()
env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}})
env.storage_controller.reconcile_until_idle()
attached_id = int(env.storage_controller.locate(env.initial_tenant)[0]["node_id"])
attached = next((ps for ps in env.pageservers if ps.id == attached_id))
def attached_is_draining():
details = env.storage_controller.node_status(attached.id)
assert details["scheduling"] == "Draining"
env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(10000)"))
env.storage_controller.node_drain(attached.id)
wait_until(10, 0.5, attached_is_draining)
attached.restart()
# we are unable to reconfigure node while the operation is still ongoing
with pytest.raises(
StorageControllerApiException,
match="Precondition failed: Ongoing background operation forbids configuring: drain.*",
):
env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"})
with pytest.raises(
StorageControllerApiException,
match="Precondition failed: Ongoing background operation forbids configuring: drain.*",
):
env.storage_controller.node_configure(attached.id, {"availability": "Offline"})
env.storage_controller.cancel_node_drain(attached.id)
def reconfigure_node_again():
env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"})
# allow for small delay between actually having cancelled and being able reconfigure again
wait_until(4, 0.5, reconfigure_node_again)

View File

@@ -204,6 +204,11 @@ def test_scrubber_physical_gc_ancestors(
},
)
# Create an extra timeline, to ensure the scrubber isn't confused by multiple timelines
env.storage_controller.pageserver_api().timeline_create(
env.pg_version, tenant_id=tenant_id, new_timeline_id=TimelineId.generate()
)
# Make sure the original shard has some layers
workload = Workload(env, tenant_id, timeline_id)
workload.init()
@@ -214,6 +219,11 @@ def test_scrubber_physical_gc_ancestors(
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
env.storage_controller.reconcile_until_idle() # Move shards to their final locations immediately
# Create a timeline after split, to ensure scrubber can handle timelines that exist in child shards but not ancestors
env.storage_controller.pageserver_api().timeline_create(
env.pg_version, tenant_id=tenant_id, new_timeline_id=TimelineId.generate()
)
# Make sure child shards have some layers. Do not force upload, because the test helper calls checkpoint, which
# compacts, and we only want to do tha explicitly later in the test.
workload.write_rows(100, upload=False)
@@ -305,10 +315,19 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
# Make sure the original shard has some layers
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(100)
workload.write_rows(100, upload=False)
workload.stop()
new_shard_count = 4
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
for shard in shards:
ps = env.get_tenant_pageserver(shard)
log.info(f"Waiting for shard {shard} on pageserver {ps.id}")
ps.http_client().timeline_checkpoint(
shard, timeline_id, compact=False, wait_until_uploaded=True
)
ps.http_client().deletion_queue_flush(execute=True)
# Create a second timeline so that when we delete the first one, child shards still have some content in S3.
#
@@ -319,15 +338,6 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
PgVersion.NOT_SET, tenant_id, other_timeline_id
)
# Write after split so that child shards have some indices in S3
workload.write_rows(100, upload=False)
for shard in shards:
ps = env.get_tenant_pageserver(shard)
log.info(f"Waiting for shard {shard} on pageserver {ps.id}")
ps.http_client().timeline_checkpoint(
shard, timeline_id, compact=False, wait_until_uploaded=True
)
# The timeline still exists in child shards and they reference its layers, so scrubbing
# now shouldn't delete anything.
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=0, mode="full")

View File

@@ -97,7 +97,7 @@ def test_ancestor_detach_branched_from(
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
flush_ep_to_pageserver(env, ep, env.initial_tenant, env.initial_timeline)
deltas = client.layer_map_info(env.initial_tenant, env.initial_timeline).delta_layers()
# there is also the in-mem layer, but ignore it for now
@@ -452,6 +452,9 @@ def test_compaction_induced_by_detaches_in_history(
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
env.pageserver.allowed_errors.append(
".*await_initial_logical_size: can't get semaphore cancel token, skipping"
)
client = env.pageserver.http_client()
def delta_layers(timeline_id: TimelineId):
@@ -524,6 +527,7 @@ def test_compaction_induced_by_detaches_in_history(
assert len([filter(lambda x: x.l0, delta_layers(branch_timeline_id))]) == 1
skip_main = branches[1:]
branch_lsn = client.timeline_detail(env.initial_tenant, branch_timeline_id)["ancestor_lsn"]
# take the fullbackup before and after inheriting the new L0s
@@ -532,6 +536,13 @@ def test_compaction_induced_by_detaches_in_history(
env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_before
)
# force initial logical sizes, so we can evict all layers from all
# timelines and exercise on-demand download for copy lsn prefix
client.timeline_detail(
env.initial_tenant, env.initial_timeline, force_await_initial_logical_size=True
)
client.evict_all_layers(env.initial_tenant, env.initial_timeline)
for _, timeline_id in skip_main:
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert reparented == set(), "we have no earlier branches at any level"
@@ -705,7 +716,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
log.info(f"stuck pageserver is id={stuck.id}")
stuck_http = stuck.http_client()
stuck_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking_pausable", "pause")
("timeline-detach-ancestor::before_starting_after_locking-pausable", "pause")
)
restarted = pageservers[int(shards[1]["node_id"])]
@@ -716,7 +727,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
restarted_http = restarted.http_client()
restarted_http.configure_failpoints(
[
("timeline-detach-ancestor::before_starting_after_locking_pausable", "pause"),
("timeline-detach-ancestor::before_starting_after_locking-pausable", "pause"),
]
)
@@ -734,7 +745,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
target.detach_ancestor(env.initial_tenant, branch_timeline_id, timeout=1)
stuck_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking_pausable", "off")
("timeline-detach-ancestor::before_starting_after_locking-pausable", "off")
)
barrier = threading.Barrier(2)
@@ -753,7 +764,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
# we have 10s, lets use 1/2 of that to help the shutdown start
time.sleep(5)
restarted_http.configure_failpoints(
("timeline-detach-ancestor::before_starting_after_locking_pausable", "off")
("timeline-detach-ancestor::before_starting_after_locking-pausable", "off")
)
fut.result()
@@ -806,7 +817,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
after starting the detach.
What remains not tested by this:
- shutdown winning over complete
- shutdown winning over complete, see test_timeline_is_deleted_before_timeline_detach_ancestor_completes
"""
if sharded and mode == "delete_tenant":
@@ -833,7 +844,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
detached_timeline = env.neon_cli.create_branch("detached soon", "main")
pausepoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
@@ -931,7 +942,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
_, offset = other.assert_log_contains(".* gc_loop.*: 1 timelines need GC", offset)
@pytest.mark.parametrize("mode", ["delete_reparentable_timeline"])
@pytest.mark.parametrize("mode", ["delete_reparentable_timeline", "create_reparentable_timeline"])
def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnvBuilder, mode: str):
"""
Technically possible storage controller concurrent interleaving timeline
@@ -943,12 +954,6 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
must be detached.
"""
assert (
mode == "delete_reparentable_timeline"
), "only one now, but creating reparentable timelines cannot be supported even with gc blocking"
# perhaps it could be supported by always doing this for the shard0 first, and after that for others.
# when we run shard0 to completion, we can use it's timelines to restrict which can be reparented.
shard_count = 2
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
@@ -980,14 +985,21 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
for ps, shard_id in [(pageservers[int(x["node_id"])], x["shard_id"]) for x in shards]:
ps.http_client().timeline_checkpoint(shard_id, env.initial_timeline)
first_branch = env.neon_cli.create_branch(
"first_branch", ancestor_branch_name="main", ancestor_start_lsn=first_branch_lsn
)
def create_reparentable_timeline() -> TimelineId:
return env.neon_cli.create_branch(
"first_branch", ancestor_branch_name="main", ancestor_start_lsn=first_branch_lsn
)
if mode == "delete_reparentable_timeline":
first_branch = create_reparentable_timeline()
else:
first_branch = None
detached_branch = env.neon_cli.create_branch(
"detached_branch", ancestor_branch_name="main", ancestor_start_lsn=detached_branch_lsn
)
pausepoint = "timeline-detach-ancestor::before_starting_after_locking_pausable"
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
stuck = pageservers[int(shards[0]["node_id"])]
stuck_http = stuck.http_client().without_status_retrying()
@@ -999,12 +1011,6 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
(pausepoint, "pause"),
)
# noticed a surprising 409 if the other one would fail instead
# victim_http.configure_failpoints([
# (pausepoint, "pause"),
# ("timeline-detach-ancestor::before_starting_after_locking", "return"),
# ])
# interleaving a create_timeline which could be reparented will produce two
# permanently different reparentings: one node has reparented, other has
# not
@@ -1023,6 +1029,7 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
assert detail.get("ancestor_lsn") is None
def first_branch_gone():
assert first_branch is not None
try:
env.storage_controller.pageserver_api().timeline_detail(
env.initial_tenant, first_branch
@@ -1043,42 +1050,178 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
stuck_http.configure_failpoints((pausepoint, "off"))
wait_until(10, 1.0, first_completed)
# if we would let victim fail, for some reason there'd be a 409 response instead of 500
# victim_http.configure_failpoints((pausepoint, "off"))
# with pytest.raises(PageserverApiException, match=".* 500 Internal Server Error failpoint: timeline-detach-ancestor::before_starting_after_locking") as exc:
# fut.result()
# assert exc.value.status_code == 409
env.storage_controller.pageserver_api().timeline_delete(
env.initial_tenant, first_branch
)
victim_http.configure_failpoints((pausepoint, "off"))
wait_until(10, 1.0, first_branch_gone)
if mode == "delete_reparentable_timeline":
assert first_branch is not None
env.storage_controller.pageserver_api().timeline_delete(
env.initial_tenant, first_branch
)
victim_http.configure_failpoints((pausepoint, "off"))
wait_until(10, 1.0, first_branch_gone)
elif mode == "create_reparentable_timeline":
first_branch = create_reparentable_timeline()
victim_http.configure_failpoints((pausepoint, "off"))
else:
raise RuntimeError("{mode}")
# it now passes, and we should get an error messages about mixed reparenting as the stuck still had something to reparent
fut.result()
mixed_results = "pageservers returned mixed results for ancestor detach; manual intervention is required."
with pytest.raises(PageserverApiException, match=mixed_results):
fut.result()
msg, offset = env.storage_controller.assert_log_contains(
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*"
)
log.info(f"expected error message: {msg}")
env.storage_controller.allowed_errors.append(
".*: shards returned different results matching=0 .*"
log.info(f"expected error message: {msg.rstrip()}")
env.storage_controller.allowed_errors.extend(
[
".*: shards returned different results matching=0 .*",
f".*: InternalServerError\\({mixed_results}",
]
)
detach_timeline()
if mode == "create_reparentable_timeline":
with pytest.raises(PageserverApiException, match=mixed_results):
detach_timeline()
else:
# it is a bit shame to flag it and then it suceeds, but most
# likely there would be a retry loop which would take care of
# this in cplane
detach_timeline()
# FIXME: perhaps the above should be automatically retried, if we get mixed results?
not_found = env.storage_controller.log_contains(
retried = env.storage_controller.log_contains(
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*",
offset=offset,
offset,
)
assert not_found is None
if mode == "delete_reparentable_timeline":
assert (
retried is None
), "detaching should had converged after both nodes saw the deletion"
elif mode == "create_reparentable_timeline":
assert retried is not None, "detaching should not have converged"
_, offset = retried
finally:
stuck_http.configure_failpoints((pausepoint, "off"))
victim_http.configure_failpoints((pausepoint, "off"))
if mode == "create_reparentable_timeline":
assert first_branch is not None
# now we have mixed ancestry
assert (
TimelineId(
stuck_http.timeline_detail(shards[0]["shard_id"], first_branch)[
"ancestor_timeline_id"
]
)
== env.initial_timeline
)
assert (
TimelineId(
victim_http.timeline_detail(shards[-1]["shard_id"], first_branch)[
"ancestor_timeline_id"
]
)
== detached_branch
)
# make sure we are still able to repair this by detaching the ancestor on the storage controller in case it ever happens
# if the ancestor would be deleted, we would partially fail, making deletion stuck.
env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, first_branch)
# and we should now have good results
not_found = env.storage_controller.log_contains(
".*/timeline/\\S+/detach_ancestor.*: shards returned different results matching=0 .*",
offset,
)
assert not_found is None
assert (
stuck_http.timeline_detail(shards[0]["shard_id"], first_branch)["ancestor_timeline_id"]
is None
)
assert (
victim_http.timeline_detail(shards[-1]["shard_id"], first_branch)[
"ancestor_timeline_id"
]
is None
)
def test_retryable_500_hit_through_storcon_during_timeline_detach_ancestor(
neon_env_builder: NeonEnvBuilder,
):
shard_count = 2
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
pageservers = dict((int(p.id), p) for p in env.pageservers)
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
assert len(set(x["node_id"] for x in shards)) == shard_count
detached_branch = env.neon_cli.create_branch("detached_branch", ancestor_branch_name="main")
pausepoint = "timeline-detach-ancestor::before_starting_after_locking-pausable"
failpoint = "timeline-detach-ancestor::before_starting_after_locking"
stuck = pageservers[int(shards[0]["node_id"])]
stuck_http = stuck.http_client().without_status_retrying()
stuck_http.configure_failpoints(
(pausepoint, "pause"),
)
env.storage_controller.allowed_errors.append(
f".*Error processing HTTP request: .* failpoint: {failpoint}"
)
http = env.storage_controller.pageserver_api()
victim = pageservers[int(shards[-1]["node_id"])]
victim.allowed_errors.append(
f".*Error processing HTTP request: InternalServerError\\(failpoint: {failpoint}"
)
victim_http = victim.http_client().without_status_retrying()
victim_http.configure_failpoints([(pausepoint, "pause"), (failpoint, "return")])
def detach_timeline():
http.detach_ancestor(env.initial_tenant, detached_branch)
def paused_at_failpoint():
stuck.assert_log_contains(f"at failpoint {pausepoint}")
victim.assert_log_contains(f"at failpoint {pausepoint}")
def first_completed():
detail = stuck_http.timeline_detail(shards[0]["shard_id"], detached_branch)
log.info(detail)
assert detail.get("ancestor_lsn") is None
with ThreadPoolExecutor(max_workers=1) as pool:
try:
fut = pool.submit(detach_timeline)
wait_until(10, 1.0, paused_at_failpoint)
# let stuck complete
stuck_http.configure_failpoints((pausepoint, "off"))
wait_until(10, 1.0, first_completed)
victim_http.configure_failpoints((pausepoint, "off"))
with pytest.raises(
PageserverApiException,
match=f".*failpoint: {failpoint}",
) as exc:
fut.result()
assert exc.value.status_code == 500
finally:
stuck_http.configure_failpoints((pausepoint, "off"))
victim_http.configure_failpoints((pausepoint, "off"))
victim_http.configure_failpoints((failpoint, "off"))
detach_timeline()
def test_retried_detach_ancestor_after_failed_reparenting(neon_env_builder: NeonEnvBuilder):
"""
@@ -1169,7 +1312,7 @@ def test_retried_detach_ancestor_after_failed_reparenting(neon_env_builder: Neon
match=".*failed to reparent all candidate timelines, please retry",
) as exc:
http.detach_ancestor(env.initial_tenant, detached)
assert exc.value.status_code == 500
assert exc.value.status_code == 503
# first round -- do more checking to make sure the gc gets paused
try_detach()
@@ -1323,14 +1466,11 @@ def test_timeline_is_deleted_before_timeline_detach_ancestor_completes(
http.configure_failpoints((failpoint, "off"))
with pytest.raises(PageserverApiException) as exc:
with pytest.raises(
PageserverApiException, match="NotFound: Timeline .* was not found"
) as exc:
detach.result()
# FIXME: this should be 404 but because there is another Anyhow conversion it is 500
assert exc.value.status_code == 500
env.pageserver.allowed_errors.append(
".*Error processing HTTP request: InternalServerError\\(detached timeline was not found after restart"
)
assert exc.value.status_code == 404
finally:
http.configure_failpoints((failpoint, "off"))

View File

@@ -49,7 +49,13 @@ from fixtures.remote_storage import (
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background
from fixtures.utils import (
PropagatingThread,
get_dir_size,
query_scalar,
start_in_background,
wait_until,
)
def wait_lsn_force_checkpoint(
@@ -63,6 +69,18 @@ def wait_lsn_force_checkpoint(
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at(
lsn: Lsn,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
pageserver_conn_options = pageserver_conn_options or {}
auth_token = None
if "password" in pageserver_conn_options:
auth_token = pageserver_conn_options["password"]
@@ -2304,3 +2322,138 @@ def test_s3_eviction(
)
assert event_metrics_seen
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
"""
Verify that pulling timeline from a SK with an uploaded partial segment
does not lead to consistency issues:
1. Start 3 SKs - only use two
2. Ingest a bit of WAL
3. Wait for partial to be uploaded
4. Pull timeline to the third SK
6. Replace source with destination SK and start compute
5. Wait for source SK to evict timeline
6. Go back to initial compute SK config and validate that
source SK can unevict the timeline (S3 state is consistent)
"""
neon_env_builder.auth_enabled = True
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--delete-offloaded-wal",
"--partial-backup-timeout",
"500ms",
"--control-file-save-interval",
"500ms",
"--eviction-min-resident=500ms",
]
env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"})
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main")
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
endpoint.stop()
def source_partial_segment_uploaded():
first_segment_name = "000000010000000000000001"
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
candidate_seg = None
for seg in segs:
if "partial" in seg and "sk1" in seg and not seg.startswith(first_segment_name):
candidate_seg = seg
if candidate_seg is not None:
# The term might change, causing the segment to be gc-ed shortly after,
# so give it a bit of time to make sure it's stable.
time.sleep(2)
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
assert candidate_seg in segs
return candidate_seg
raise Exception("Partial segment not uploaded yet")
source_partial_segment = wait_until(15, 1, source_partial_segment_uploaded)
log.info(
f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
log.info(f"Tracking source partial segment: {source_partial_segment}")
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
pageserver_conn_options = {"password": env.auth_keys.generate_tenant_token(tenant_id)}
wait_lsn_force_checkpoint_at(
src_flush_lsn, tenant_id, timeline_id, env.pageserver, pageserver_conn_options
)
dst_sk.pull_timeline([src_sk], tenant_id, timeline_id)
def evicted():
evictions = src_sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
)
if evictions is None or evictions == 0:
raise Exception("Eviction did not happen on source safekeeper yet")
wait_until(30, 1, evicted)
endpoint.start(safekeepers=[2, 3])
def new_partial_segment_uploaded():
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
for seg in segs:
if "partial" in seg and "sk3" in seg:
return seg
raise Exception("Partial segment not uploaded yet")
log.info(
f"Uploaded segments before post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
wait_until(15, 1, new_partial_segment_uploaded)
log.info(
f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
# Allow for some gc iterations to happen and assert that the original
# uploaded partial segment remains in place.
time.sleep(5)
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
assert source_partial_segment in segs
log.info(
f"Uploaded segments at the end are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
# Restart the endpoint in order to check that the source safekeeper
# can unevict the timeline
endpoint.stop()
endpoint.start(safekeepers=[1, 2])
def unevicted():
unevictions = src_sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
)
if unevictions is None or unevictions == 0:
raise Exception("Uneviction did not happen on source safekeeper yet")
wait_until(10, 1, unevicted)

View File

@@ -1,14 +1,14 @@
{
"v16": [
"16.3",
"5ea106b2583285849784e774b39d62eb2615bd5d"
"47a9122a5a150a3217fafd3f3d4fe8e020ea718a"
],
"v15": [
"15.7",
"39c51c33b383239c78b86afe561679f980e44842"
"46b4b235f38413ab5974bb22c022f9b829257674"
],
"v14": [
"14.12",
"a48faca1d9aef59649dd1bf34bc1b6303fa3489e"
"3fd7a45f8aae85c080df6329e3c85887b7f3a737"
]
}

View File

@@ -30,13 +30,17 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
der = { version = "0.7", default-features = false, features = ["oid", "pem", "std"] }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
digest = { version = "0.10", features = ["mac", "oid", "std"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["sink"] }
futures-executor = { version = "0.3" }
futures-io = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.14", features = ["raw"] }
hex = { version = "0.4", features = ["serde"] }
@@ -44,6 +48,7 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper = { version = "0.14", features = ["full"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2" }
@@ -64,8 +69,10 @@ rustls = { version = "0.21", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["raw_value"] }
sha2 = { version = "0.10", features = ["asm"] }
sha2 = { version = "0.10", features = ["asm", "oid"] }
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
subtle = { version = "2" }
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
tikv-jemalloc-sys = { version = "0.5" }
@@ -81,7 +88,7 @@ tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4", "v7"] }
zeroize = { version = "1", features = ["derive"] }
zeroize = { version = "1", features = ["derive", "serde"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
@@ -97,6 +104,7 @@ getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2" }