Compare commits

...

41 Commits

Author SHA1 Message Date
Alexander Lakhin
eba6f85909 Add flaky tests to test how testing for flaky tests works 2025-05-14 11:31:45 +03:00
Alexander Lakhin
52888e06a0 Run testing of new tests within "Build and Test" workflow 2025-05-14 11:31:44 +03:00
Alexander Lakhin
2b51b7cbb1 Transform "Build and Run Selected Test" workflow into "Build and Test Tests" 2025-05-14 11:31:44 +03:00
Alexander Lakhin
5895abf495 Allow for adding optional postfix to allure report name 2025-05-14 11:31:44 +03:00
Alexander Lakhin
05559539f1 Adjust regress-tests step to pass list of selected tests 2025-05-14 11:31:43 +03:00
Alexander Lakhin
8f2808864b Allow for running multiple selected tests 2025-05-14 11:31:43 +03:00
Konstantin Knizhnik
4b9087651c Checked that stored LwLSN >= FirstNormalUnloggedLSN (#11750)
## Problem

Undo unintended change 60b9fb1baf

## Summary of changes

Add assert that we are not storing fake LSN in LwLSN.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-02 19:27:59 +00:00
Konstantin Knizhnik
79699aebc8 Reserve in file descriptor pool sockets used for connections to page servers (#11798)
## Problem

See https://github.com/neondatabase/neon/issues/11790

The neon extension opens extensions to the pageservers, which consumes
file descriptors. Postgres has a mechanism to count how many FDs are in
use, but it doesn't know about those FDs. We should call
ReserveExternalFD() or AcquireExternalFD() to account for them.

## Summary of changes

Call `ReserveExternalFD()` for each shard

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Mikhail Kot <mikhail@neon.tech>
2025-05-02 14:36:10 +00:00
Alexander Bayandin
22290eb7ba CI: notify relevant team about release deploy failures (#11797)
## Problem

We notify only Storage team about failed deploys, but Compute and Proxy
teams can also benefit from that

## Summary of changes
- Adjust `notify-storage-release-deploy-failure` to notify the relevant
team about failed deploy
2025-05-02 12:46:21 +00:00
Alex Chi Z.
bbc35e10b8 fix(test): increase timeouts for some tests (#11781)
## Problem

Those tests are timing out more frequently after
https://github.com/neondatabase/neon/pull/11585

## Summary of changes

Increase timeout for `test_pageserver_gc_compaction_smoke`

Increase rollback wait timeout for `test_tx_abort_with_many_relations`

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-01 18:36:26 +00:00
Alex Chi Z.
ae2c3ac12f test: revert relsizev2 config (#11759)
## Problem

part of https://github.com/neondatabase/neon/issues/9516

One thing I realized in the past few months is that "no-way-back" things
like this are scary to roll out without a fine-grained rollout infra.
The plan was to flip the flag in the repo and roll it out soon, but I
don't think rolling out would happen in the near future. So I'd rather
revert the flag to avoid creating a discrepancy between staging and the
regress tests.

## Summary of changes

Not using rel_size_v2 by default in unit tests; we still have a few
tests to explicitly test the new format so we still get some test
coverages.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-01 17:51:10 +00:00
Vlad Lazar
16d594b7b3 pagectl: list layers for given key in decreasing LSN order (#11799)
Adds an extra key CLI arg to `pagectl layer list-layer`. When provided,
only layers with key ranges containing the key will be listed in
decreasing LSN order (indices are preserved for `dump-layer`).
2025-05-01 15:56:43 +00:00
Suhas Thalanki
f999632327 Adding anon v2 support to the dockerfile (#11313)
## Problem

Removed `anon` v1 support as described here:
https://github.com/neondatabase/cloud/issues/22663

Adding `anon` v2 support to re-introduce the `pg_anon` extension. 
Related Issues: https://github.com/neondatabase/cloud/issues/20456


## Summary of changes

Adding `anon` v2 support by building it in the dockerfile
2025-05-01 15:22:01 +00:00
Shockingly Good
5bd850d15a Fix the leaked tracing context for the "compute_monitor:run". (#11791)
Removes the leaked tracing context for the "compute_monitor:run" log,
which either inherited the "start_compute" span or also the HTTP request
context.

## Problem

The problem is that the context of the monitor's trace is unnecessarily
populated with the span data inherited from previously within the same
thread.

## Summary of changes

The context is completely reset by moving the span from the thread
spawning the monitor into the thread where the monitor will actually
start working.

Addresses https://github.com/neondatabase/cloud/issues/28145

## Examples

### Before
```
2025-04-30T16:39:05.840298Z  INFO start_compute:compute_monitor:run: compute is not running, waiting before monitoring activity
```

### After

```
2025-04-30T16:39:05.840298Z  INFO compute_monitor:run: compute is not running, waiting before monitoring activity
```
2025-05-01 09:09:10 +00:00
Dmitrii Kovalkov
1b789e8d7c fix(pgxn/neon): Use proper member size in TermsCollectedMset and VotesCollectedMset (#11785)
## Problem
`TermsCollectedMset` and `VotesCollectedMset` accept a MemberSet
argument to find a quorum in. It may be either `wp->mconf.members` or
`wp->mconf.new_members`. But the loops inside always use
`wp->mconf.members.len`.

If the sizes of member sets are different, it may lead to these
functions not scanning all the safekeepers from `mset`.

We are not planning to change the member set size dynamically now, but
it's worth fixing anyway.

- Part of https://github.com/neondatabase/neon/issues/11669

## Summary of changes
- Use proper size of member set in `TermsCollectedMset` and
`VotesCollectedMset`
2025-04-30 16:50:21 +00:00
Arpad Müller
bec7427d9e pull_timeline and sk logging fixes (#11786)
This patch contains some fixes of issues I ran into for #11712:

* make `pull_timeline` return success for timeline that already exists.
This follows general API design of storage components: API endpoints are
retryable and converge to a status code, instead of starting to error.
We change the `pull_timeline`'s return type a little bit, because we
might not actually have a source sk to pull from. Note that the fix is
not enough, there is still a race when two `pull_timeline` instances
happen in parallel: we might try to enter both pulled timelines at the
same time. That can be fixed later.
* make `pull_timeline` support one safekeeper being down. In general, if
one safekeeper is down, that's not a problem. the added comment explains
a potential situation (found in the `test_lagging_sk` test for example)
* don't log very long errors when computes try to connect to safekeepers
that don't have the timeline yet, if `allow_timeline_creation` is false.
That flag is enabled when a sk connection string with generation numbers
is passed to the compute, so we'll hit this code path more often. E.g.
when a safekeeper missed a timeline creation, but the compute connects
to it first before the `pull_timeline` gets requested by the storcon
reconciler: this is a perfectly normal situation. So don't log the whole
error backtrace, and don't log it on the error log level, but only on
info.

part of #11670
2025-04-30 16:24:01 +00:00
Alex Chi Z.
e2db76b9be feat(pageserver): ondemand download reason observability (#11780)
## Problem

Part of https://github.com/neondatabase/neon/issues/11615

## Summary of changes

We don't understand the root cause of why we get resident size surge
every now and then. This patch adds observability for that, and in the
next week, we might have a better understanding of what's going on.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-30 16:04:00 +00:00
Alex Chi Z.
6b4b8e0d8b fix(pageserver): do not increase basebackup err counter when shutdown (#11778)
## Problem

We occasionally see basebackup errors alerts but there were no errors
logged. Looking at the code, the only codepath that will cause this is
shutting down.

## Summary of changes

Do not increase any counter (ok/err) when basebackup request gets
cancelled due to shutdowns.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-30 15:50:12 +00:00
Konstantin Knizhnik
1d68577fbd Check target slot state in prefetch_wait_for (#11779)
## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1745599814030679

Assume the following scenario: prefetch_wait_for is doing
`CHECK_FOR_INTERRUPTS` which tries to load prefetch responses.
In case of error is calls pageserver_disconnect which aborts all
in-flight requests. But such failure is not detected by
`prefetch_wait_for` which returns true. As a result
`communicator_read_at_lsnv` assumes that slot is received, but as far as
asserts are disables at prod, it is not actually checked.
Then it tries to interpret response and ... *SIGSEGV*

## Summary of changes

Check target slot state  in `prefetch_wait_for`.

Resolves https://github.com/neondatabase/cloud/issues/28258

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-30 12:44:59 +00:00
Arseny Sher
60f63c076f Make safekeeper proto version 3 default (#11518)
## Problem

We have been running compute <-> sk protocol version 3 for a while on
staging with no issues observed, and want to fully migrate to it
eventually.

## Summary of changes

Let's make v3 the default.

ref https://github.com/neondatabase/neon/issues/10326

---------

Co-authored-by: Arpad Müller <arpad@neon.tech>
2025-04-30 12:23:20 +00:00
Mikhail Kot
8da4ec9740 Postgres metrics for stuck getpage requests (#11710)
https://github.com/neondatabase/neon/issues/10327
Resolves: #11720 

New metrics:
- `compute_getpage_stuck_requests_total`
- `compute_getpage_max_inflight_stuck_time_ms`
2025-04-30 12:01:41 +00:00
Em Sharnoff
b48404952d Bump vm-builder: v0.42.2 -> v0.46.0 (#11782)
Bumped to pick up the changes from neondatabase/autoscaling#1366 —
specifically including `uname` in the logs.

Other changes included:

* neondatabase/autoscaling#1301
* neondatabase/autoscaling#1296
2025-04-30 11:32:25 +00:00
devin-ai-integration[bot]
1d06172d59 pageserver: remove resident size from billing metrics (#11699)
This is a rebase of PR #10739 by @henryliu2014 on the current main
branch.

## Problem

pageserver: remove resident size from billing metrics

Fixes #10388

## Summary of changes

The following changes have been made to remove resident size from
billing metrics:

* removed the metric "resident_size" and related codes in
consumption_metrics/metrics.rs
* removed the item of the description of metric "resident_size" in
consumption_metrics.md
* refactored the metric "resident_size" related test case

Requested by: John Spray (john@neon.tech)

---------

Co-authored-by: liuheqing <hq.liu@qq.com>
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: John Spray <john@neon.tech>
2025-04-29 18:34:56 +00:00
Elizabeth Murray
a08c1a23eb Upgrade the pgrag version in the compute Dockerfile. (#11687)
Update the compute Dockerfile to use a new version of pgrag. The new
version of pgrag uses the latest pgrx, and has a fix that terminates
background workers on postmaster exit.
2025-04-29 16:50:18 +00:00
John Spray
a2adc7dbd3 storcon: avoid multiple initdbs when shard 0 has stale locations (#11760)
## Problem

In #11727 I overlooked the case of multiple attached locations for shard
0.

I misread the code and thought `create_one` acts on one location, but it
actually acts on one _shard_, which is potentially multiple locations.

This was not a regression, but it meant that the fix was incomplete.

## Summary of changes

- In `create_one`, when updating shard zero, have any "other" locations
use the initdb from shard 0
2025-04-29 15:31:52 +00:00
Vlad Lazar
768a580373 pageserver: add not modified since lsn to get page span (#11774)
It's useful when debugging.
2025-04-29 14:07:23 +00:00
Folke Behrens
09247de8d5 proxy: Enable JSON logging by default (#11772)
This does not affect local_proxy.
2025-04-29 13:11:24 +00:00
Arpad Müller
0b35929211 Make SafekeeperReconciler parallel via semaphore (#11757)
Right now we only support running one reconciliation per safekeeper.
This is of course usually way below of what a safekeeper can do.
Therefore, introduce a semaphore and spawn the tasks asynchronously as
they come in.

Part of #11670
2025-04-29 12:46:15 +00:00
Ivan Efremov
b3db7f66ac fix(compute): Change the local_proxy log level (#11770)
Related to the INC-496
2025-04-29 11:49:16 +00:00
a-masterov
498d852bde Fix the empty region if run on schedule (#11764)
## Problem

When the workflow ran on a schedule, the `region_id` input was not set.
As a result, an empty region value was used, which caused errors during
execution.

## Summary of Changes

- Added fallback logic to set a default region (`aws-us-east-2`) when
`region_id` is not provided.
- Ensures the workflow works correctly both when triggered manually
(`workflow_dispatch`) and on schedule (`cron`).
2025-04-29 09:12:14 +00:00
Busra Kugler
7f8b1d79c0 Replace dorny/paths-filter with step-security maintained version (#11663)
## Problem
Our CI/CD security tool StepSecurity maintains safer forks of popular
GitHub Actions with low security scores. We're replacing
dorny/paths-filter with the maintained step-security/paths-filter
version to reduce risk of supply chain breaches and potential CVEs.

## Summary of changes
replace
```uses: dorny/paths-filter@de90cc6fb3 ```  with ```uses: step-security/paths-filter@v3```

This PR will fix: neondatabase/cloud#26141
2025-04-29 09:02:01 +00:00
JC Grünhage
d15f2ff57a fix(lint-release-pr): adjust lint and action to match (#11766)
## Problem
The `lint-release-pr` workflow run for
https://github.com/neondatabase/neon/pull/11763 failed, because the new
action did not match the lint.

## Summary of changes
Include time in expected merge message regex.
2025-04-29 08:56:44 +00:00
Konstantin Knizhnik
3593356c10 Prewarm sql api (#11742)
## Problem

Continue work on prewarm, see 
https://github.com/neondatabase/neon/pull/11740
https://github.com/neondatabase/neon/pull/11741

## Summary of changes

Add SQL API to prewarm

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-29 06:44:28 +00:00
Tristan Partin
9e8ab2ab4f Skip remote extensions WITH_LIB test when sanitizers are enabled (#11758)
In order for the test to work when sanitizers are enabled, we would need
to compile the dummy Postgres extension with the same sanitizer flags
that we compile Postgres and the neon extension with. Doing this work
would be a little more than trivial, so skipping is the best option, at
least for now.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-28 19:13:35 +00:00
Alex Chi Z.
c1ff7db187 fix(pageserver): consider tombstones in replorigin (#11752)
## Problem

We didn't consider tombstones in replorigin read path in the past. This
was fine because tombstones are stored as LSN::Invalid before we
universally define what the tombstone is for sparse keyspaces.

Now we remove non-inherited keys during detach ancestor and write the
universal tombstone "empty image". So we need to consider it across all
the read paths.

related: https://github.com/neondatabase/neon/pull/11299

## Summary of changes

Empty value gets ignored for replorigin scans.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-28 18:54:26 +00:00
Konstantin Knizhnik
6d6b83e737 Prewarm implementation (#11741)
## Problem

Continue work on prewarm started in PR
https://github.com/neondatabase/neon/pull/11740

## Summary of changes

Implement prewarm using prefetch

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-28 18:17:03 +00:00
John Spray
0482690534 pageserver: make control_plane_api & generations fully mandatory (#10715)
## Problem

We had retained the ability to run in a generation-less mode to support
test_generations_upgrade, which was replaced with a cleaner backward
compat test in https://github.com/neondatabase/neon/pull/10701

## Summary of changes

- Remove all the special cases for "if no generation" or "if no control
plane api"
- Make control_plane_api config mandatory

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-04-28 17:24:55 +00:00
Tristan Partin
a750026c2e Fix compiler warning in libpagestore.c when WITH_SANITIZERS=yes (#11755)
Postgres has a nice self-documenting macro called pg_unreachable() when
you want to assert that a location in code won't be hit.

Warning in question:

```
/home/tristan957/Projects/work/neon//pgxn/neon/libpagestore.c: In function ‘pageserver_connect’:
/home/tristan957/Projects/work/neon//pgxn/neon/libpagestore.c:739:1: warning: control reaches end of non-void function [-Wreturn-type]
  739 | }
      | ^
```

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-28 17:09:48 +00:00
John Spray
998d2c2ce9 storcon: use shard 0's initdb for timeline creation (#11727)
## Problem

In princple, pageservers with different postgres binaries might generate
different initdbs, resulting in inconsistency between shards. To avoid
that, we should have shard 0 generate the initdb and other shards re-use
it.

Fixes: https://github.com/neondatabase/neon/issues/11340

## Summary of changes

- For shards with index greater than zero, set
`existing_initdb_timeline_id` in timeline creation to consume the
existing initdb rather than creating a new one
2025-04-28 16:43:35 +00:00
JC Grünhage
b1fa68f659 impr(ci): switch release PR creation over to use python based action (#11679)
## Problem
Our different repositories had both had code to achieve very similar
results in terms of release PR creation, but they were structured
differently and had different extensions. This was likely to cause
maintainability problems in the long run.

## Summary of changes
Switch to a python cli based composite action for creating the release
PRs that will also be introduced in our other repos later.

## To Do
- [ ] Adjust our docs to reflect the changes from this.
2025-04-28 16:37:36 +00:00
devin-ai-integration[bot]
84bc3380cc Remove SAFEKEEPER_AUTH_TOKEN env var parsing from safekeeper (#11698)
# Remove SAFEKEEPER_AUTH_TOKEN env var parsing from safekeeper

This PR is a follow-up to #11443 that removes the parsing of the
`SAFEKEEPER_AUTH_TOKEN` environment variable from the safekeeper
codebase while keeping the `auth_token_path` CLI flag functionality.

## Changes:
- Removed code that checks for the `SAFEKEEPER_AUTH_TOKEN` environment
variable
- Updated comments to reflect that only the `auth_token_path` CLI flag
is now used

As mentioned in PR #11443, the environment variable approach was planned
to be deprecated and removed in favor of the file-based approach, which
is more secure since environment variables can be quite public in both
procfs and unit files.

Link to Devin run:
https://app.devin.ai/sessions/d6f56cf1b4164ea9880a9a06358a58ac

Requested by: arpad@neon.tech

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: arpad@neon.tech <arpad@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-04-28 15:34:47 +00:00
77 changed files with 1667 additions and 422 deletions

View File

@@ -33,9 +33,14 @@ config-variables:
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID
- SLACK_COMPUTE_CHANNEL_ID
- SLACK_ON_CALL_DEVPROD_STREAM
- SLACK_ON_CALL_QA_STAGING_STREAM
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_ONCALL_COMPUTE_GROUP
- SLACK_ONCALL_PROXY_GROUP
- SLACK_ONCALL_STORAGE_GROUP
- SLACK_PROXY_CHANNEL_ID
- SLACK_RUST_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- SLACK_UPCOMING_RELEASE_CHANNEL_ID

View File

@@ -43,7 +43,7 @@ runs:
BUCKET: neon-github-public-dev
run: |
if [ -n "${PR_NUMBER}" ]; then
BRANCH_OR_PR=pr-${PR_NUMBER}
BRANCH_OR_PR=pr-${PR_NUMBER}${REPORT_EXT-}
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || \
[ "${GITHUB_REF_NAME}" = "release-proxy" ] || [ "${GITHUB_REF_NAME}" = "release-compute" ]; then
# Shortcut for special branches

View File

@@ -23,7 +23,7 @@ runs:
REPORT_DIR: ${{ inputs.report-dir }}
run: |
if [ -n "${PR_NUMBER}" ]; then
BRANCH_OR_PR=pr-${PR_NUMBER}
BRANCH_OR_PR=pr-${PR_NUMBER}${REPORT_EXT-}
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || \
[ "${GITHUB_REF_NAME}" = "release-proxy" ] || [ "${GITHUB_REF_NAME}" = "release-compute" ]; then
# Shortcut for special branches

View File

@@ -12,6 +12,10 @@ inputs:
description: 'Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr'
required: false
default: ''
extended_testing:
description: 'Set to true if the test results should be stored and processed separately'
required: false
default: 'false'
needs_postgres_source:
description: 'Set to true if the test suite requires postgres source checked out'
required: false
@@ -135,13 +139,15 @@ runs:
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
echo "PERF_REPORT_DIR=${PERF_REPORT_DIR}" >> ${GITHUB_ENV}
rm -rf $PERF_REPORT_DIR
TEST_SELECTION="test_runner/${{ inputs.test_selection }}"
EXTRA_PARAMS="${{ inputs.extra_params }}"
TEST_SELECTION="${{ inputs.test_selection }}"
if [ -z "$TEST_SELECTION" ]; then
echo "test_selection must be set"
exit 1
fi
if [[ $TEST_SELECTION != test_runner/* ]]; then
TEST_SELECTION="test_runner/$TEST_SELECTION"
fi
EXTRA_PARAMS="${{ inputs.extra_params }}"
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
# -n sets the number of parallel processes that pytest-xdist will run
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
@@ -244,3 +250,5 @@ runs:
report-dir: /tmp/test_output/allure/results
unique-key: ${{ inputs.build_type }}-${{ inputs.pg_version }}-${{ runner.arch }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
env:
REPORT_EXT: ${{ inputs.extended_testing == 'true' && '-ext' || '' }}

143
.github/scripts/detect-updated-pytests.py vendored Executable file
View File

@@ -0,0 +1,143 @@
import os
import re
import shutil
import subprocess
import sys
commit_sha = os.getenv("COMMIT_SHA")
base_sha = os.getenv("BASE_SHA")
cmd = ["git", "merge-base", base_sha, commit_sha]
print(f"Running: {' '.join(cmd)}...")
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0 or not (baseline := result.stdout.strip()):
print("Baseline commit for PR is not found, detection skipped.")
sys.exit(0)
print(f"Baseline commit: {baseline}")
cmd = ["git", "diff", "--name-only", f"{baseline}..{commit_sha}", "test_runner/regress/"]
print(f"Running: {' '.join(cmd)}...")
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
print(f"Git diff returned code {result.returncode}\n{result.stdout}\nDetection skipped.")
sys.exit(0)
def collect_tests(test_file_name):
cmd = ["./scripts/pytest", "--collect-only", "-q", test_file_name]
print(f"Running: {' '.join(cmd)}...")
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
print(
f"pytest --collect-only returned code {result.returncode}\n{result.stdout}\nDetection skipped."
)
sys.exit(0)
tests = []
for test_item in result.stdout.split("\n"):
if not test_item.startswith(test_file_name):
break
test_name = re.sub(r"(.*::)([^\[]+)(\[.*)", r"\2", test_item)
if test_name not in tests:
tests.append(test_name)
return tests
all_new_tests = []
all_updated_tests = []
temp_test_file = "test_runner/regress/__temp__.py"
temp_file = None
for test_file in result.stdout.split("\n"):
if not test_file:
continue
print(f"Test file modified: {test_file}.")
# Get and compare two lists of items collected by pytest to detect new tests in the PR
if temp_file:
temp_file.close()
temp_file = open(temp_test_file, "w")
cmd = ["git", "show", f"{baseline}:{test_file}"]
print(f"Running: {' '.join(cmd)}...")
result = subprocess.run(cmd, text=True, stdout=temp_file)
if result.returncode != 0:
tests0 = []
else:
tests0 = collect_tests(temp_test_file)
tests1 = collect_tests(test_file)
new_tests = set(tests1).difference(tests0)
for test_name in new_tests:
all_new_tests.append(f"{test_file}::{test_name}")
# Detect pre-existing test functions updated in the PR
cmd = ["git", "diff", f"{baseline}..{commit_sha}", test_file]
print(f"Running: {' '.join(cmd)}...")
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
print(f"Git diff returned code {result.returncode}\n{result.stdout}\nDetection skipped.")
sys.exit(0)
updated_funcs = []
for diff_line in result.stdout.split("\n"):
print(diff_line)
# TODO: detect functions with added/modified parameters
if not diff_line.startswith("@@"):
continue
# Extract names of functions with updated content relying on hunk header
m = re.match(r"^(@@[0-9, +-]+@@ def )([^(]+)(.*)", diff_line)
if not m:
continue
func_name = m.group(2)
print(func_name) ##
# Ignore functions not collected by pytest
if func_name not in tests1:
continue
if func_name not in updated_funcs:
updated_funcs.append(func_name)
for func_name in updated_funcs:
print(f"Function modified: {func_name}.")
# Extract changes within the function
cmd = ["git", "log", f"{baseline}..{commit_sha}", "-L", f":{func_name}:{test_file}"]
print(f"Running: {' '.join(cmd)}...")
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
continue
patch_contents = result.stdout
# Revert changes to get the file with only this function updated
# (applying the patch might fail if it contains a change for the next function declaraion)
shutil.copy(test_file, temp_test_file)
cmd = ["patch", "-R", "-p1", "--no-backup-if-mismatch", "-r", "/dev/null", temp_test_file]
print(f"Running: {' '.join(cmd)}...")
result = subprocess.run(
cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, input=patch_contents
)
print(f"result: {result.returncode}; {result.stdout}")
if result.returncode != 0:
continue
# Ignore whitespace-only changes
cmd = ["diff", "-w", test_file, temp_test_file]
print(f"Running: {' '.join(cmd)}...")
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode == 0:
continue
all_updated_tests.append(f"{test_file}::{func_name}")
if temp_file:
temp_file.close()
if os.path.exists(temp_test_file):
os.remove(temp_test_file)
if github_output := os.getenv("GITHUB_OUTPUT"):
with open(github_output, "a") as f:
if all_new_tests or all_updated_tests:
f.write("tests=")
f.write(" ".join(all_new_tests + all_updated_tests))
f.write("\n")

View File

@@ -41,7 +41,7 @@ echo "Merge base of ${MAIN_BRANCH} and ${RELEASE_BRANCH}: ${MERGE_BASE}"
LAST_COMMIT=$(git rev-parse HEAD)
MERGE_COMMIT_MESSAGE=$(git log -1 --format=%s "${LAST_COMMIT}")
EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2}$"
EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2} UTC$"
if ! [[ "${MERGE_COMMIT_MESSAGE}" =~ ${EXPECTED_MESSAGE_REGEX} ]]; then
report_error "Merge commit message does not match expected pattern: '<component> release YYYY-MM-DD'

View File

@@ -266,7 +266,7 @@ jobs:
role-duration-seconds: 18000 # 5 hours
- name: Run rust tests
if: ${{ inputs.sanitizers != 'enabled' }}
if: ${{ inputs.sanitizers != 'enabled' && inputs.test-selection == '' }}
env:
NEXTEST_RETRIES: 3
run: |
@@ -386,7 +386,8 @@ jobs:
timeout-minutes: ${{ inputs.sanitizers != 'enabled' && 75 || 180 }}
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
test_selection: ${{ inputs.test-selection != '' && inputs.test-selection || 'regress' }}
extended_testing: ${{ inputs.test-selection != '' && 'true' || 'false' }}
needs_postgres_source: true
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
@@ -399,9 +400,7 @@ jobs:
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring

View File

@@ -1,103 +0,0 @@
name: Create Release PR
on:
workflow_call:
inputs:
component-name:
description: 'Component name'
required: true
type: string
source-branch:
description: 'Source branch'
required: true
type: string
secrets:
ci-access-token:
description: 'CI access token'
required: true
defaults:
run:
shell: bash -euo pipefail {0}
permissions:
contents: read
jobs:
create-release-branch:
runs-on: ubuntu-22.04
permissions:
contents: write # for `git push`
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
ref: ${{ inputs.source-branch }}
fetch-depth: 0
- name: Set variables
id: vars
env:
COMPONENT_NAME: ${{ inputs.component-name }}
RELEASE_BRANCH: >-
${{
false
|| inputs.component-name == 'Storage' && 'release'
|| inputs.component-name == 'Proxy' && 'release-proxy'
|| inputs.component-name == 'Compute' && 'release-compute'
}}
run: |
now_date=$(date -u +'%Y-%m-%d')
now_time=$(date -u +'%H-%M-%Z')
{
echo "title=${COMPONENT_NAME} release ${now_date}"
echo "rc-branch=rc/${RELEASE_BRANCH}/${now_date}_${now_time}"
echo "release-branch=${RELEASE_BRANCH}"
} | tee -a ${GITHUB_OUTPUT}
- name: Configure git
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
- name: Create RC branch
env:
RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }}
RC_BRANCH: ${{ steps.vars.outputs.rc-branch }}
TITLE: ${{ steps.vars.outputs.title }}
run: |
git switch -c "${RC_BRANCH}"
# Manually create a merge commit on the current branch, keeping the
# tree and setting the parents to the current HEAD and the HEAD of the
# release branch. This commit is what we'll fast-forward the release
# branch to when merging the release branch.
# For details on why, look at
# https://docs.neon.build/overview/repositories/neon.html#background-on-commit-history-of-release-prs
current_tree=$(git rev-parse 'HEAD^{tree}')
release_head=$(git rev-parse "origin/${RELEASE_BRANCH}")
current_head=$(git rev-parse HEAD)
merge_commit=$(git commit-tree -p "${current_head}" -p "${release_head}" -m "${TITLE}" "${current_tree}")
# Fast-forward the current branch to the newly created merge_commit
git merge --ff-only ${merge_commit}
git push origin "${RC_BRANCH}"
- name: Create a PR into ${{ steps.vars.outputs.release-branch }}
env:
GH_TOKEN: ${{ secrets.ci-access-token }}
RC_BRANCH: ${{ steps.vars.outputs.rc-branch }}
RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }}
TITLE: ${{ steps.vars.outputs.title }}
run: |
gh pr create --title "${TITLE}" \
--body "" \
--head "${RC_BRANCH}" \
--base "${RELEASE_BRANCH}"

View File

@@ -69,7 +69,7 @@ jobs:
submodules: true
- name: Check for file changes
uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 # v3.0.2
uses: step-security/paths-filter@v3
id: files-changed
with:
token: ${{ secrets.GITHUB_TOKEN }}
@@ -199,6 +199,12 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit
build-and-test-new-tests:
needs: [ meta, build-build-tools-image ]
if: github.event_name == 'pull_request'
uses: ./.github/workflows/build_and_test_tests.yml
secrets: inherit
build-and-test-locally:
needs: [ meta, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
@@ -824,7 +830,7 @@ jobs:
- pg: v17
debian: bookworm
env:
VM_BUILDER_VERSION: v0.42.2
VM_BUILDER_VERSION: v0.46.0
steps:
- name: Harden the runner (Audit all outbound calls)
@@ -1434,10 +1440,10 @@ jobs:
;;
esac
notify-storage-release-deploy-failure:
needs: [ deploy ]
notify-release-deploy-failure:
needs: [ meta, deploy ]
# We want this to run even if (transitive) dependencies are skipped, because deploy should really be successful on release branch workflow runs.
if: github.ref_name == 'release' && needs.deploy.result != 'success' && always()
if: contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), needs.meta.outputs.run-kind) && needs.deploy.result != 'success' && always()
runs-on: ubuntu-22.04
steps:
- name: Harden the runner (Audit all outbound calls)
@@ -1445,15 +1451,40 @@ jobs:
with:
egress-policy: audit
- name: Post release-deploy failure to team-storage slack channel
- name: Post release-deploy failure to team slack channel
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
env:
TEAM_ONCALL: >-
${{
fromJSON(format('{
"storage-release": "<!subteam^{0}|@oncall-storage>",
"compute-release": "<!subteam^{1}|@oncall-compute>",
"proxy-release": "<!subteam^{2}|@oncall-proxy>"
}',
vars.SLACK_ONCALL_STORAGE_GROUP,
vars.SLACK_ONCALL_COMPUTE_GROUP,
vars.SLACK_ONCALL_PROXY_GROUP
))[needs.meta.outputs.run-kind]
}}
CHANNEL: >-
${{
fromJSON(format('{
"storage-release": "{0}",
"compute-release": "{1}",
"proxy-release": "{2}"
}',
vars.SLACK_STORAGE_CHANNEL_ID,
vars.SLACK_COMPUTE_CHANNEL_ID,
vars.SLACK_PROXY_CHANNEL_ID
))[needs.meta.outputs.run-kind]
}}
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_STORAGE_CHANNEL_ID }}
channel: ${{ env.CHANNEL }}
text: |
🔴 <!subteam^S06CJ87UMNY|@oncall-storage>: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
🔴 ${{ env.TEAM_ONCALL }}: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
promote-compatibility-data:

View File

@@ -1,10 +1,10 @@
name: Build and Run Selected Test
name: Build and Run Selected Tests
on:
workflow_dispatch:
inputs:
test-selection:
description: 'Specification of selected test(s), as accepted by pytest -k'
description: 'Specification of selected test(s), e. g.: test_runner/regress/test_pg_regress.py::test_pg_regress'
required: true
type: string
run-count:
@@ -26,6 +26,8 @@ on:
default: '[{"pg_version":"v17"}]'
required: true
type: string
workflow_call:
pull_request: # TODO: remove before merge
defaults:
run:
@@ -42,26 +44,71 @@ jobs:
github-event-name: ${{ github.event_name }}
github-event-json: ${{ toJSON(github.event) }}
build-and-test-locally:
needs: [ meta ]
choose-test-parameters:
runs-on: [ self-hosted, small ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
outputs:
tests: ${{ inputs.test-selection != '' && inputs.test-selection || steps.detect_tests_to_test.outputs.tests }}
archs: ${{ inputs.test-selection != '' && inputs.archs || '["x64", "arm64"]' }}
build-types: ${{ inputs.test-selection != '' && inputs.build-types || '["release"]' }}
pg-versions: ${{ inputs.test-selection != '' && inputs.pg-versions || '[{"pg_version":"v14"}, {"pg_version":"v17"}]' }}
run-count: ${{ inputs.test-selection != '' && inputs.run-count || 5 }}
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
if: inputs.test-selection == ''
with:
submodules: false
clean: false
fetch-depth: 1000
- name: Cache poetry deps
if: inputs.test-selection == ''
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
if: inputs.test-selection == ''
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: Detect new and updated tests
id: detect_tests_to_test
if: github.event.pull_request.head.sha && inputs.test-selection == ''
env:
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
BASE_SHA: ${{ github.event.pull_request.base.sha || github.sha }}
run: python3 .github/scripts/detect-updated-pytests.py
build-and-test-tests:
needs: [ meta, choose-test-parameters ]
if: needs.choose-test-parameters.outputs.tests != ''
strategy:
fail-fast: false
matrix:
arch: ${{ fromJson(inputs.archs) }}
build-type: ${{ fromJson(inputs.build-types) }}
arch: ${{ fromJson(needs.choose-test-parameters.outputs.archs) }}
build-type: ${{ fromJson(needs.choose-test-parameters.outputs.build-types) }}
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ghcr.io/neondatabase/build-tools:pinned-bookworm
build-tag: ${{ needs.meta.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
test-cfg: ${{ inputs.pg-versions }}
test-selection: ${{ inputs.test-selection }}
test-run-count: ${{ fromJson(inputs.run-count) }}
test-cfg: ${{ needs.choose-test-parameters.outputs.pg-versions }}
test-selection: ${{ needs.choose-test-parameters.outputs.tests }}
test-run-count: ${{ fromJson(needs.choose-test-parameters.outputs.run-count) }}
secrets: inherit
create-test-report:
needs: [ build-and-test-locally ]
needs: [ build-and-test-tests ]
if: ${{ !cancelled() }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -96,6 +143,7 @@ jobs:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_DEV }}
REPORT_EXT: '-ext'
- uses: actions/github-script@v7
if: ${{ !cancelled() }}

View File

@@ -68,7 +68,7 @@ jobs:
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ inputs.region_id }}
region_id: ${{ inputs.region_id || 'aws-us-east-2' }}
postgres_version: ${{ matrix.pg-version }}
project_settings: ${{ steps.project-settings.outputs.settings }}
# We need these settings to get the expected output results.

View File

@@ -53,7 +53,7 @@ jobs:
submodules: true
- name: Check for Postgres changes
uses: dorny/paths-filter@1441771bbfdd59dcd748680ee64ebd8faab1a242 #v3
uses: step-security/paths-filter@v3
id: files_changed
with:
token: ${{ github.token }}

12
.github/workflows/release-compute.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
name: Create compute release PR
on:
schedule:
- cron: '0 7 * * FRI'
jobs:
create-release-pr:
uses: ./.github/workflows/release.yml
with:
component: compute
secrets: inherit

12
.github/workflows/release-proxy.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
name: Create proxy release PR
on:
schedule:
- cron: '0 6 * * TUE'
jobs:
create-release-pr:
uses: ./.github/workflows/release.yml
with:
component: proxy
secrets: inherit

12
.github/workflows/release-storage.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
name: Create storage release PR
on:
schedule:
- cron: '0 6 * * FRI'
jobs:
create-release-pr:
uses: ./.github/workflows/release.yml
with:
component: storage
secrets: inherit

View File

@@ -1,25 +1,34 @@
name: Create Release Branch
name: Create release PR
on:
schedule:
# It should be kept in sync with if-condition in jobs
- cron: '0 6 * * TUE' # Proxy release
- cron: '0 6 * * FRI' # Storage release
- cron: '0 7 * * FRI' # Compute release
workflow_dispatch:
inputs:
create-storage-release-branch:
type: boolean
description: 'Create Storage release PR'
component:
description: "Component to release"
required: true
type: choice
options:
- compute
- proxy
- storage
cherry-pick:
description: "Commits to cherry-pick (space separated, makes this a hotfix based on previous release)"
required: false
create-proxy-release-branch:
type: boolean
description: 'Create Proxy release PR'
required: false
create-compute-release-branch:
type: boolean
description: 'Create Compute release PR'
type: string
default: ''
workflow_call:
inputs:
component:
description: "Component to release"
required: true
type: string
cherry-pick:
description: "Commits to cherry-pick (space separated, makes this a hotfix based on previous release)"
required: false
type: string
default: ''
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
@@ -29,41 +38,31 @@ defaults:
shell: bash -euo pipefail {0}
jobs:
create-storage-release-branch:
if: ${{ github.event.schedule == '0 6 * * FRI' || inputs.create-storage-release-branch }}
create-release-pr:
runs-on: ubuntu-22.04
permissions:
contents: write
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Storage'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
create-proxy-release-branch:
if: ${{ github.event.schedule == '0 6 * * TUE' || inputs.create-proxy-release-branch }}
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
permissions:
contents: write
- name: Configure git
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Proxy'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}
create-compute-release-branch:
if: ${{ github.event.schedule == '0 7 * * FRI' || inputs.create-compute-release-branch }}
permissions:
contents: write
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Compute'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Create release PR
uses: neondatabase/dev-actions/release-pr@290dec821d86fa8a93f019e8c69720f5865b5677
with:
component: ${{ inputs.component }}
cherry-pick: ${{ inputs.cherry-pick }}
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}

View File

@@ -1083,6 +1083,34 @@ ARG PG_VERSION
RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions pgrx14"
#
#########################################################################################
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
ARG PG_VERSION
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions pgrx14"
#
# Version 14 is now required by a few
# This layer should be used as a base for new pgrx extensions,
# and eventually get merged with `rust-extensions-build`
#
#########################################################################################
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
ARG PG_VERSION
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
@@ -1100,11 +1128,11 @@ RUN wget https://github.com/microsoft/onnxruntime/archive/refs/tags/v1.18.1.tar.
mkdir onnxruntime-src && cd onnxruntime-src && tar xzf ../onnxruntime.tar.gz --strip-components=1 -C . && \
echo "#nothing to test here" > neon-test.sh
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.0.0.tar.gz -O pgrag.tar.gz && \
echo "2cbe394c1e74fc8bcad9b52d5fbbfb783aef834ca3ce44626cfd770573700bb4 pgrag.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.1.1.tar.gz -O pgrag.tar.gz && \
echo "087b2ecd11ba307dc968042ef2e9e43dc04d9ba60e8306e882c407bbe1350a50 pgrag.tar.gz" | sha256sum --check && \
mkdir pgrag-src && cd pgrag-src && tar xzf ../pgrag.tar.gz --strip-components=1 -C .
FROM rust-extensions-build-pgrx12 AS pgrag-build
FROM rust-extensions-build-pgrx14 AS pgrag-build
COPY --from=pgrag-src /ext-src/ /ext-src/
# Install build-time dependencies
@@ -1124,19 +1152,19 @@ RUN . venv/bin/activate && \
WORKDIR /ext-src/pgrag-src
RUN cd exts/rag && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/rag.control
RUN cd exts/rag_bge_small_en_v15 && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
ORT_LIB_LOCATION=/ext-src/onnxruntime-src/build/Linux \
REMOTE_ONNX_URL=http://pg-ext-s3-gateway/pgrag-data/bge_small_en_v15.onnx \
cargo pgrx install --release --features remote_onnx && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/rag_bge_small_en_v15.control
RUN cd exts/rag_jina_reranker_v1_tiny_en && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
ORT_LIB_LOCATION=/ext-src/onnxruntime-src/build/Linux \
REMOTE_ONNX_URL=http://pg-ext-s3-gateway/pgrag-data/jina_reranker_v1_tiny_en.onnx \
cargo pgrx install --release --features remote_onnx && \
@@ -1319,6 +1347,39 @@ COPY --from=pg_session_jwt-src /ext-src/ /ext-src/
WORKDIR /ext-src/pg_session_jwt-src
RUN cargo pgrx install --release
#########################################################################################
#
# Layer "pg-anon-pg-build"
# compile anon extension
#
#########################################################################################
FROM pg-build AS pg_anon-src
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
WORKDIR /ext-src
COPY compute/patches/anon_v2.patch .
# This is an experimental extension, never got to real production.
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/latest/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt && \
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "=0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
patch -p1 < /ext-src/anon_v2.patch
FROM rust-extensions-build-pgrx14 AS pg-anon-pg-build
ARG PG_VERSION
COPY --from=pg_anon-src /ext-src/ /ext-src/
WORKDIR /ext-src
RUN cd pg_anon-src && \
make -j $(getconf _NPROCESSORS_ONLN) extension PG_CONFIG=/usr/local/pgsql/bin/pg_config PGVER=pg$(echo "$PG_VERSION" | sed 's/^v//') && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config PGVER=pg$(echo "$PG_VERSION" | sed 's/^v//') && \
chmod -R a+r ../pg_anon-src && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control;
########################################################################################
#########################################################################################
#
# Layer "wal2json-build"
@@ -1615,6 +1676,7 @@ COPY --from=pg_uuidv7-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_roaringbitmap-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_semver-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=wal2json-build /usr/local/pgsql /usr/local/pgsql
COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/

View File

@@ -23,6 +23,8 @@
import 'sql_exporter/getpage_prefetch_requests_total.libsonnet',
import 'sql_exporter/getpage_prefetches_buffered.libsonnet',
import 'sql_exporter/getpage_sync_requests_total.libsonnet',
import 'sql_exporter/compute_getpage_stuck_requests_total.libsonnet',
import 'sql_exporter/compute_getpage_max_inflight_stuck_time_ms.libsonnet',
import 'sql_exporter/getpage_wait_seconds_bucket.libsonnet',
import 'sql_exporter/getpage_wait_seconds_count.libsonnet',
import 'sql_exporter/getpage_wait_seconds_sum.libsonnet',

View File

@@ -0,0 +1,9 @@
{
metric_name: 'compute_getpage_max_inflight_stuck_time_ms',
type: 'gauge',
help: 'Max wait time for stuck requests among all backends. Includes only active stuck requests, terminated or disconnected ones are not accounted for',
values: [
'compute_getpage_max_inflight_stuck_time_ms',
],
query_ref: 'neon_perf_counters',
}

View File

@@ -0,0 +1,9 @@
{
metric_name: 'compute_getpage_stuck_requests_total',
type: 'counter',
help: 'Total number of Getpage requests left without an answer for more than pageserver_response_log_timeout but less than pageserver_response_disconnect_timeout',
values: [
'compute_getpage_stuck_requests_total',
],
query_ref: 'neon_perf_counters',
}

View File

@@ -9,6 +9,8 @@ SELECT d.* FROM pg_catalog.jsonb_to_record((SELECT jb FROM c)) AS d(
getpage_wait_seconds_sum numeric,
getpage_prefetch_requests_total numeric,
getpage_sync_requests_total numeric,
compute_getpage_stuck_requests_total numeric,
compute_getpage_max_inflight_stuck_time_ms numeric,
getpage_prefetch_misses_total numeric,
getpage_prefetch_discards_total numeric,
getpage_prefetches_buffered numeric,

View File

@@ -0,0 +1,129 @@
diff --git a/sql/anon.sql b/sql/anon.sql
index 0cdc769..f6cc950 100644
--- a/sql/anon.sql
+++ b/sql/anon.sql
@@ -1141,3 +1141,8 @@ $$
-- TODO : https://en.wikipedia.org/wiki/L-diversity
-- TODO : https://en.wikipedia.org/wiki/T-closeness
+
+-- NEON Patches
+
+GRANT ALL ON SCHEMA anon to neon_superuser;
+GRANT ALL ON ALL TABLES IN SCHEMA anon TO neon_superuser;
diff --git a/sql/init.sql b/sql/init.sql
index 7da6553..9b6164b 100644
--- a/sql/init.sql
+++ b/sql/init.sql
@@ -74,50 +74,49 @@ $$
SECURITY LABEL FOR anon ON FUNCTION anon.load_csv IS 'UNTRUSTED';
--- load fake data from a given path
-CREATE OR REPLACE FUNCTION anon.init(
- datapath TEXT
-)
+CREATE OR REPLACE FUNCTION anon.load_fake_data()
RETURNS BOOLEAN
AS $$
DECLARE
- datapath_check TEXT;
success BOOLEAN;
+ sharedir TEXT;
+ datapath TEXT;
BEGIN
- IF anon.is_initialized() THEN
- RAISE NOTICE 'The anon extension is already initialized.';
- RETURN TRUE;
- END IF;
+ datapath := '/extension/anon/';
+ -- find the local extension directory
+ SELECT setting INTO sharedir
+ FROM pg_catalog.pg_config
+ WHERE name = 'SHAREDIR';
SELECT bool_or(results) INTO success
FROM unnest(array[
- anon.load_csv('anon.identifiers_category',datapath||'/identifiers_category.csv'),
- anon.load_csv('anon.identifier',datapath ||'/identifier.csv'),
- anon.load_csv('anon.address',datapath ||'/address.csv'),
- anon.load_csv('anon.city',datapath ||'/city.csv'),
- anon.load_csv('anon.company',datapath ||'/company.csv'),
- anon.load_csv('anon.country',datapath ||'/country.csv'),
- anon.load_csv('anon.email', datapath ||'/email.csv'),
- anon.load_csv('anon.first_name',datapath ||'/first_name.csv'),
- anon.load_csv('anon.iban',datapath ||'/iban.csv'),
- anon.load_csv('anon.last_name',datapath ||'/last_name.csv'),
- anon.load_csv('anon.postcode',datapath ||'/postcode.csv'),
- anon.load_csv('anon.siret',datapath ||'/siret.csv'),
- anon.load_csv('anon.lorem_ipsum',datapath ||'/lorem_ipsum.csv')
+ anon.load_csv('anon.identifiers_category',sharedir || datapath || '/identifiers_category.csv'),
+ anon.load_csv('anon.identifier',sharedir || datapath || '/identifier.csv'),
+ anon.load_csv('anon.address',sharedir || datapath || '/address.csv'),
+ anon.load_csv('anon.city',sharedir || datapath || '/city.csv'),
+ anon.load_csv('anon.company',sharedir || datapath || '/company.csv'),
+ anon.load_csv('anon.country',sharedir || datapath || '/country.csv'),
+ anon.load_csv('anon.email', sharedir || datapath || '/email.csv'),
+ anon.load_csv('anon.first_name',sharedir || datapath || '/first_name.csv'),
+ anon.load_csv('anon.iban',sharedir || datapath || '/iban.csv'),
+ anon.load_csv('anon.last_name',sharedir || datapath || '/last_name.csv'),
+ anon.load_csv('anon.postcode',sharedir || datapath || '/postcode.csv'),
+ anon.load_csv('anon.siret',sharedir || datapath || '/siret.csv'),
+ anon.load_csv('anon.lorem_ipsum',sharedir || datapath || '/lorem_ipsum.csv')
]) results;
RETURN success;
-
END;
$$
- LANGUAGE PLPGSQL
+ LANGUAGE plpgsql
VOLATILE
RETURNS NULL ON NULL INPUT
- PARALLEL UNSAFE -- because load_csv is unsafe
- SECURITY INVOKER
+ PARALLEL UNSAFE -- because of the EXCEPTION
+ SECURITY DEFINER
SET search_path=''
;
-SECURITY LABEL FOR anon ON FUNCTION anon.init(TEXT) IS 'UNTRUSTED';
+
+SECURITY LABEL FOR anon ON FUNCTION anon.load_fake_data IS 'UNTRUSTED';
-- People tend to forget the anon.init() step
-- This is a friendly notice for them
@@ -144,7 +143,7 @@ SECURITY LABEL FOR anon ON FUNCTION anon.notice_if_not_init IS 'UNTRUSTED';
CREATE OR REPLACE FUNCTION anon.load(TEXT)
RETURNS BOOLEAN AS
$$
- SELECT anon.init($1);
+ SELECT anon.init();
$$
LANGUAGE SQL
VOLATILE
@@ -159,16 +158,16 @@ SECURITY LABEL FOR anon ON FUNCTION anon.load(TEXT) IS 'UNTRUSTED';
CREATE OR REPLACE FUNCTION anon.init()
RETURNS BOOLEAN
AS $$
- WITH conf AS (
- -- find the local extension directory
- SELECT setting AS sharedir
- FROM pg_catalog.pg_config
- WHERE name = 'SHAREDIR'
- )
- SELECT anon.init(conf.sharedir || '/extension/anon/')
- FROM conf;
+BEGIN
+ IF anon.is_initialized() THEN
+ RAISE NOTICE 'The anon extension is already initialized.';
+ RETURN TRUE;
+ END IF;
+
+ RETURN anon.load_fake_data();
+END;
$$
- LANGUAGE SQL
+ LANGUAGE plpgsql
VOLATILE
PARALLEL UNSAFE -- because init is unsafe
SECURITY INVOKER

View File

@@ -22,7 +22,7 @@ commands:
- name: local_proxy
user: postgres
sysvInitAction: respawn
shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
shell: 'RUST_LOG="error" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn

View File

@@ -22,7 +22,7 @@ commands:
- name: local_proxy
user: postgres
sysvInitAction: respawn
shell: 'RUST_LOG="info,proxy::serverless::sql_over_http=warn" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
shell: 'RUST_LOG="error" /usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn

View File

@@ -424,10 +424,10 @@ pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
experimental,
};
let span = span!(Level::INFO, "compute_monitor");
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || {
let span = span!(Level::INFO, "compute_monitor");
let _enter = span.enter();
monitor.run();
})

View File

@@ -112,7 +112,7 @@ impl SafekeeperNode {
}
/// Initializes a safekeeper node by creating all necessary files,
/// e.g. SSL certificates.
/// e.g. SSL certificates and JWT token file.
pub fn initialize(&self) -> anyhow::Result<()> {
if self.env.generate_local_ssl_certs {
self.env.generate_ssl_cert(
@@ -120,6 +120,17 @@ impl SafekeeperNode {
&self.datadir_path().join("server.key"),
)?;
}
// Generate a token file for authentication with other safekeepers
if self.conf.auth_enabled {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
let token_path = self.datadir_path().join("peer_jwt_token");
std::fs::write(token_path, token)?;
}
Ok(())
}
@@ -218,14 +229,26 @@ impl SafekeeperNode {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}
if self.conf.auth_enabled {
let token_path = self.datadir_path().join("peer_jwt_token");
let token_path_str = token_path
.to_str()
.with_context(|| {
format!("Token path {token_path:?} cannot be represented as a unicode string")
})?
.to_owned();
args.extend(["--auth-token-path".to_owned(), token_path_str]);
}
args.extend_from_slice(extra_opts);
let env_variables = Vec::new();
background_process::start_process(
&format!("safekeeper-{id}"),
&datadir,
&self.env.safekeeper_bin(),
&args,
self.safekeeper_env_variables()?,
env_variables,
background_process::InitialPidFile::Expect(self.pid_file()),
retry_timeout,
|| async {
@@ -239,18 +262,6 @@ impl SafekeeperNode {
.await
}
fn safekeeper_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
// Generate a token to connect from safekeeper to peers
if self.conf.auth_enabled {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
Ok(vec![("SAFEKEEPER_AUTH_TOKEN".to_owned(), token)])
} else {
Ok(Vec::new())
}
}
///
/// Stop the server.
///

View File

@@ -3,3 +3,5 @@ pg_distrib_dir='/usr/local/'
listen_pg_addr='0.0.0.0:6400'
listen_http_addr='0.0.0.0:9898'
remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' }
control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address
control_plane_emergency_mode=true

View File

@@ -38,11 +38,6 @@ Currently, the following metrics are collected:
Amount of WAL produced , by a timeline, i.e. last_record_lsn
This is an absolute, per-timeline metric.
- `resident_size`
Size of all the layer files in the tenant's directory on disk on the pageserver.
This is an absolute, per-tenant metric.
- `remote_storage_size`
Size of the remote storage (S3) directory.

View File

@@ -841,6 +841,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
let expected_end = match &end {
ServerInitiated(_) | CopyDone | CopyFail | Terminate | EOF | Cancelled => true,
// The timeline doesn't exist and we have been requested to not auto-create it.
// Compute requests for timelines that haven't been created yet
// might reach us before the storcon request to create those timelines.
TimelineNoCreate => true,
CopyStreamHandlerEnd::Disconnected(ConnectionError::Io(io_error))
if is_expected_io_error(io_error) =>
{
@@ -1059,6 +1063,8 @@ pub enum CopyStreamHandlerEnd {
Terminate,
#[error("EOF on COPY stream")]
EOF,
#[error("timeline not found, and allow_timeline_creation is false")]
TimelineNoCreate,
/// The connection was lost
#[error("connection error: {0}")]
Disconnected(#[from] ConnectionError),

View File

@@ -303,7 +303,8 @@ pub struct PullTimelineRequest {
#[derive(Debug, Serialize, Deserialize)]
pub struct PullTimelineResponse {
// Donor safekeeper host
pub safekeeper_host: String,
/// Donor safekeeper host.
/// None if no pull happened because the timeline already exists.
pub safekeeper_host: Option<String>,
// TODO: add more fields?
}

View File

@@ -10,6 +10,7 @@ use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer, delta_layer, ima
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::virtual_file::api::IoMode;
use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::parse_filename;
@@ -27,6 +28,7 @@ pub(crate) enum LayerCmd {
path: PathBuf,
tenant: String,
timeline: String,
key: Option<Key>,
},
/// Dump all information of a layer file
DumpLayer {
@@ -100,6 +102,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
path,
tenant,
timeline,
key,
} => {
let timeline_path = path
.join(TENANTS_SEGMENT_NAME)
@@ -107,21 +110,37 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
.join(TIMELINES_SEGMENT_NAME)
.join(timeline);
let mut idx = 0;
let mut to_print = Vec::default();
for layer in fs::read_dir(timeline_path)? {
let layer = layer?;
if let Ok(layer_file) = parse_filename(&layer.file_name().into_string().unwrap()) {
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
if let Some(key) = key {
if layer_file.key_range.start <= *key && *key < layer_file.key_range.end {
to_print.push((idx, layer_file));
}
} else {
to_print.push((idx, layer_file));
}
idx += 1;
}
}
if key.is_some() {
to_print
.sort_by_key(|(_idx, layer_file)| std::cmp::Reverse(layer_file.lsn_range.end));
}
for (idx, layer_file) in to_print {
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
}
Ok(())
}
LayerCmd::DumpLayer {

View File

@@ -504,7 +504,7 @@ fn start_pageserver(
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
StorageControllerUpcallClient::new(conf, &shutdown_pageserver)?,
StorageControllerUpcallClient::new(conf, &shutdown_pageserver),
conf,
);
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());

View File

@@ -150,7 +150,7 @@ pub struct PageServerConf {
/// not terrible.
pub background_task_maximum_delay: Duration,
pub control_plane_api: Option<Url>,
pub control_plane_api: Url,
/// JWT token for use with the control plane API.
pub control_plane_api_token: Option<SecretString>,
@@ -438,7 +438,8 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api,
control_plane_api: control_plane_api
.ok_or_else(|| anyhow::anyhow!("`control_plane_api` must be set"))?,
control_plane_emergency_mode,
heatmap_upload_concurrency,
secondary_download_concurrency,
@@ -573,6 +574,7 @@ impl PageServerConf {
background_task_maximum_delay: Duration::ZERO,
load_previous_heatmap: Some(true),
generate_unarchival_heatmap: Some(true),
control_plane_api: Some(Url::parse("http://localhost:6666").unwrap()),
..Default::default()
};
PageServerConf::parse_and_validate(NodeId(0), config_toml, &repo_dir).unwrap()
@@ -641,9 +643,12 @@ mod tests {
use super::PageServerConf;
#[test]
fn test_empty_config_toml_is_valid() {
// we use Default impl of everything in this situation
fn test_minimal_config_toml_is_valid() {
// The minimal valid config for running a pageserver:
// - control_plane_api is mandatory, as pageservers cannot run in isolation
// - we use Default impl of everything else in this situation
let input = r#"
control_plane_api = "http://localhost:6666"
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("empty config is valid");

View File

@@ -30,9 +30,6 @@ pub(super) enum Name {
/// Tenant remote size
#[serde(rename = "remote_storage_size")]
RemoteSize,
/// Tenant resident size
#[serde(rename = "resident_size")]
ResidentSize,
/// Tenant synthetic size
#[serde(rename = "synthetic_storage_size")]
SyntheticSize,
@@ -187,18 +184,6 @@ impl MetricsKey {
.absolute_values()
}
/// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
///
/// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: Name::ResidentSize,
}
.absolute_values()
}
/// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
///
/// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
@@ -261,10 +246,7 @@ where
let mut tenants = std::pin::pin!(tenants);
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
let timelines = tenant.list_timelines();
let timelines_len = timelines.len();
for timeline in timelines {
let timeline_id = timeline.timeline_id;
@@ -287,16 +269,9 @@ where
continue;
}
}
tenant_resident_size += timeline.resident_physical_size();
}
if timelines_len == 0 {
// Force set it to 1 byte to avoid not being reported -- all timelines are offloaded.
tenant_resident_size = 1;
}
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
let snap = TenantSnapshot::collect(&tenant);
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
}
@@ -305,19 +280,14 @@ where
/// In-between abstraction to allow testing metrics without actual Tenants.
struct TenantSnapshot {
resident_size: u64,
remote_size: u64,
synthetic_size: u64,
}
impl TenantSnapshot {
/// Collect tenant status to have metrics created out of it.
///
/// `resident_size` is calculated of the timelines we had access to for other metrics, so we
/// cannot just list timelines here.
fn collect(t: &Arc<crate::tenant::TenantShard>, resident_size: u64) -> Self {
fn collect(t: &Arc<crate::tenant::TenantShard>) -> Self {
TenantSnapshot {
resident_size,
remote_size: t.remote_size(),
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
@@ -334,8 +304,6 @@ impl TenantSnapshot {
) {
let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
let synthetic_size = {
let factory = MetricsKey::synthetic_size(tenant_id);
let mut synthetic_size = self.synthetic_size;
@@ -355,11 +323,7 @@ impl TenantSnapshot {
}
};
metrics.extend(
[Some(remote_size), Some(resident_size), synthetic_size]
.into_iter()
.flatten(),
);
metrics.extend([Some(remote_size), synthetic_size].into_iter().flatten());
}
}

View File

@@ -224,7 +224,6 @@ fn post_restart_synthetic_size_uses_cached_if_available() {
let tenant_id = TenantId::generate();
let ts = TenantSnapshot {
resident_size: 1000,
remote_size: 1000,
// not yet calculated
synthetic_size: 0,
@@ -245,7 +244,6 @@ fn post_restart_synthetic_size_uses_cached_if_available() {
metrics,
&[
MetricsKey::remote_storage_size(tenant_id).at(now, 1000),
MetricsKey::resident_size(tenant_id).at(now, 1000),
MetricsKey::synthetic_size(tenant_id).at(now, 1000),
]
);
@@ -256,7 +254,6 @@ fn post_restart_synthetic_size_is_not_sent_when_not_cached() {
let tenant_id = TenantId::generate();
let ts = TenantSnapshot {
resident_size: 1000,
remote_size: 1000,
// not yet calculated
synthetic_size: 0,
@@ -274,7 +271,6 @@ fn post_restart_synthetic_size_is_not_sent_when_not_cached() {
metrics,
&[
MetricsKey::remote_storage_size(tenant_id).at(now, 1000),
MetricsKey::resident_size(tenant_id).at(now, 1000),
// no synthetic size here
]
);
@@ -295,14 +291,13 @@ pub(crate) const fn metric_examples_old(
timeline_id: TimelineId,
now: DateTime<Utc>,
before: DateTime<Utc>,
) -> [RawMetric; 6] {
) -> [RawMetric; 5] {
[
MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_until_old_format(before, now, 0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0),
MetricsKey::resident_size(tenant_id).at_old_format(now, 0),
MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1),
]
}
@@ -312,13 +307,12 @@ pub(crate) const fn metric_examples(
timeline_id: TimelineId,
now: DateTime<Utc>,
before: DateTime<Utc>,
) -> [NewRawMetric; 6] {
) -> [NewRawMetric; 5] {
[
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
MetricsKey::resident_size(tenant_id).at(now, 0),
MetricsKey::synthetic_size(tenant_id).at(now, 1),
]
}

View File

@@ -521,10 +521,6 @@ mod tests {
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
@@ -564,7 +560,7 @@ mod tests {
assert_eq!(upgraded_samples, new_samples);
}
fn metric_samples_old() -> [RawMetric; 6] {
fn metric_samples_old() -> [RawMetric; 5] {
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);
@@ -576,7 +572,7 @@ mod tests {
super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
}
fn metric_samples() -> [NewRawMetric; 6] {
fn metric_samples() -> [NewRawMetric; 5] {
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);

View File

@@ -58,14 +58,8 @@ pub trait StorageControllerUpcallApi {
impl StorageControllerUpcallClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(
conf: &'static PageServerConf,
cancel: &CancellationToken,
) -> Result<Option<Self>, reqwest::Error> {
let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(),
None => return Ok(None),
};
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Self {
let mut url = conf.control_plane_api.clone();
if let Ok(mut segs) = url.path_segments_mut() {
// This ensures that `url` ends with a slash if it doesn't already.
@@ -85,15 +79,17 @@ impl StorageControllerUpcallClient {
}
for cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
client = client.add_root_certificate(
Certificate::from_der(cert.contents()).expect("Invalid certificate in config"),
);
}
Ok(Some(Self {
http_client: client.build()?,
Self {
http_client: client.build().expect("Failed to construct HTTP client"),
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
}))
}
}
#[tracing::instrument(skip_all)]

View File

@@ -585,7 +585,7 @@ impl DeletionQueue {
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
pub fn new<C>(
remote_storage: GenericRemoteStorage,
controller_upcall_client: Option<C>,
controller_upcall_client: C,
conf: &'static PageServerConf,
) -> (Self, DeletionQueueWorkers<C>)
where
@@ -701,7 +701,7 @@ mod test {
async fn restart(&mut self) {
let (deletion_queue, workers) = DeletionQueue::new(
self.storage.clone(),
Some(self.mock_control_plane.clone()),
self.mock_control_plane.clone(),
self.harness.conf,
);
@@ -821,11 +821,8 @@ mod test {
let mock_control_plane = MockStorageController::new();
let (deletion_queue, worker) = DeletionQueue::new(
storage.clone(),
Some(mock_control_plane.clone()),
harness.conf,
);
let (deletion_queue, worker) =
DeletionQueue::new(storage.clone(), mock_control_plane.clone(), harness.conf);
let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());

View File

@@ -53,7 +53,7 @@ where
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
// Client for calling into control plane API for validation of deletes
controller_upcall_client: Option<C>,
controller_upcall_client: C,
// DeletionLists which are waiting generation validation. Not safe to
// execute until [`validate`] has processed them.
@@ -86,7 +86,7 @@ where
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
controller_upcall_client: Option<C>,
controller_upcall_client: C,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken,
) -> Self {
@@ -137,20 +137,16 @@ where
return Ok(());
}
let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
match controller_upcall_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
{
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
// The only way a validation call returns an error is when the cancellation token fires
return Err(DeletionQueueError::ShuttingDown);
}
let tenants_valid = match self
.controller_upcall_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
{
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
// The only way a validation call returns an error is when the cancellation token fires
return Err(DeletionQueueError::ShuttingDown);
}
} else {
// Control plane API disabled. In legacy mode we consider everything valid.
tenant_generations.keys().map(|k| (*k, true)).collect()
};
let mut validated_sequence: Option<u64> = None;

View File

@@ -497,6 +497,24 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) static ONDEMAND_DOWNLOAD_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_ondemand_download_bytes_total",
"Total bytes of layers on-demand downloaded",
&["task_kind"]
)
.expect("failed to define a metric")
});
pub(crate) static ONDEMAND_DOWNLOAD_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_ondemand_download_count",
"Total count of layers on-demand downloaded",
&["task_kind"]
)
.expect("failed to define a metric")
});
pub(crate) mod wait_ondemand_download_time {
use super::*;
const WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS: &[f64] = &[
@@ -2180,6 +2198,10 @@ impl BasebackupQueryTimeOngoingRecording<'_> {
// If you want to change categorize of a specific error, also change it in `log_query_error`.
let metric = match res {
Ok(_) => &self.parent.ok,
Err(QueryError::Shutdown) => {
// Do not observe ok/err for shutdown
return;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error)))
if is_expected_io_error(io_error) =>
{

View File

@@ -1035,10 +1035,25 @@ impl PageServerHandler {
// avoid a somewhat costly Span::record() by constructing the entire span in one go.
macro_rules! mkspan {
(before shard routing) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
tracing::info_span!(
parent: &parent_span,
"handle_get_page_request",
rel = %req.rel,
blkno = %req.blkno,
req_lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since
)
}};
($shard_id:expr) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
tracing::info_span!(
parent: &parent_span,
"handle_get_page_request",
rel = %req.rel,
blkno = %req.blkno,
req_lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since,
shard_id = %$shard_id
)
}};
}
@@ -1102,6 +1117,7 @@ impl PageServerHandler {
shard_id = %shard.get_shard_identity().shard_slug(),
timeline_id = %timeline_id,
lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since,
request_id = %req.hdr.reqid,
key = %key,
)

View File

@@ -1084,8 +1084,17 @@ impl Timeline {
let mut result = HashMap::new();
for (k, v) in kv {
let v = v?;
if v.is_empty() {
// This is a tombstone -- we can skip it.
// Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
// the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
// we also need to consider that. Such tombstones might be written on the detach ancestor code path to
// avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
continue;
}
let origin_id = k.field6 as RepOriginId;
let origin_lsn = Lsn::des(&v).unwrap();
let origin_lsn = Lsn::des(&v)
.with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
if origin_lsn != Lsn::INVALID {
result.insert(origin_id, origin_lsn);
}
@@ -2578,6 +2587,11 @@ impl DatadirModification<'_> {
}
}
#[cfg(test)]
pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
self.put(key, val);
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)

View File

@@ -4254,9 +4254,7 @@ impl TenantShard {
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
) -> TenantShard {
debug_assert!(
!attached_conf.location.generation.is_none() || conf.control_plane_api.is_none()
);
assert!(!attached_conf.location.generation.is_none());
let (state, mut rx) = watch::channel(state);
@@ -5949,7 +5947,9 @@ mod tests {
use itertools::Itertools;
#[cfg(feature = "testing")]
use models::CompactLsnRange;
use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
use pageserver_api::key::{
AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX, repl_origin_key,
};
use pageserver_api::keyspace::KeySpace;
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
@@ -8185,6 +8185,54 @@ mod tests {
assert_eq!(files.get("pg_logical/mappings/test2"), None);
}
#[tokio::test]
async fn test_repl_origin_tombstones() {
let harness = TenantHarness::create("test_repl_origin_tombstones")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let repl_lsn = Lsn(0x10);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification.put_for_unit_test(repl_origin_key(2), Value::Image(Bytes::new()));
modification.set_replorigin(1, repl_lsn).await.unwrap();
modification.commit(&ctx).await.unwrap();
}
// we can read everything from the storage
let repl_origins = tline
.get_replorigins(lsn, &ctx, io_concurrency.clone())
.await
.unwrap();
assert_eq!(repl_origins.len(), 1);
assert_eq!(repl_origins[&1], lsn);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification.put_for_unit_test(
repl_origin_key(3),
Value::Image(Bytes::copy_from_slice(b"cannot_decode_this")),
);
modification.commit(&ctx).await.unwrap();
}
let result = tline
.get_replorigins(lsn, &ctx, io_concurrency.clone())
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation").await?;

View File

@@ -346,7 +346,8 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel)? {
} else {
let client = StorageControllerUpcallClient::new(conf, cancel);
info!("Calling {} API to re-attach tenants", client.base_url());
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {
@@ -360,9 +361,6 @@ async fn init_load_generations(
anyhow::bail!("Shut down while waiting for control plane re-attach response")
}
}
} else {
info!("Control plane API not configured, tenant generations are disabled");
return Ok(None);
};
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
@@ -1153,17 +1151,8 @@ impl TenantManager {
// Testing hack: if we are configured with no control plane, then drop the generation
// from upserts. This enables creating generation-less tenants even though neon_local
// always uses generations when calling the location conf API.
let attached_conf = if cfg!(feature = "testing") {
let mut conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
if self.conf.control_plane_api.is_none() {
conf.location.generation = Generation::none();
}
conf
} else {
AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?
};
let attached_conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
let tenant = tenant_spawn(
self.conf,

View File

@@ -4,6 +4,7 @@ use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use crate::PERF_TRACE_TARGET;
use crate::metrics::{ONDEMAND_DOWNLOAD_BYTES, ONDEMAND_DOWNLOAD_COUNT};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
@@ -1255,6 +1256,14 @@ impl LayerInner {
self.access_stats.record_residence_event();
let task_kind: &'static str = ctx.task_kind().into();
ONDEMAND_DOWNLOAD_BYTES
.with_label_values(&[task_kind])
.inc_by(self.desc.file_size);
ONDEMAND_DOWNLOAD_COUNT
.with_label_values(&[task_kind])
.inc();
Ok(self.initialize_after_layer_is_on_disk(permit))
}
Err(e) => {

View File

@@ -178,7 +178,7 @@ impl Attempt {
}
}
async fn generate_tombstone_image_layer(
pub(crate) async fn generate_tombstone_image_layer(
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,

View File

@@ -163,8 +163,7 @@ pub async fn doit(
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
.expect("storcon configured");
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,

View File

@@ -36,6 +36,8 @@ DATA = \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \

View File

@@ -687,8 +687,14 @@ prefetch_wait_for(uint64 ring_index)
END_PREFETCH_RECEIVE_WORK();
CHECK_FOR_INTERRUPTS();
}
return result;
if (result)
{
/* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */
PrefetchRequest *slot = GetPrfSlot(ring_index);
return slot->status == PRFS_RECEIVED;
}
return false;
;
}
/*

View File

@@ -98,7 +98,6 @@
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log))
#define BLOCK_TO_CHUNK_OFF(blkno) ((blkno) & (lfc_blocks_per_chunk-1))
/*
@@ -135,6 +134,15 @@ typedef struct FileCacheEntry
#define N_COND_VARS 64
#define CV_WAIT_TIMEOUT 10
#define MAX_PREWARM_WORKERS 8
typedef struct PrewarmWorkerState
{
uint32 prewarmed_pages;
uint32 skipped_pages;
TimestampTz completed;
} PrewarmWorkerState;
typedef struct FileCacheControl
{
uint64 generation; /* generation is needed to handle correct hash
@@ -156,25 +164,43 @@ typedef struct FileCacheControl
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
size_t n_prewarm_workers;
size_t n_prewarm_entries;
size_t total_prewarm_pages;
size_t prewarm_batch;
bool prewarm_active;
bool prewarm_canceled;
dsm_handle prewarm_lfc_state_handle;
} FileCacheControl;
bool lfc_store_prefetch_result;
#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc
#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks])
#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8)
#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8)
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
/*
@@ -500,6 +526,17 @@ lfc_init(void)
NULL,
NULL);
DefineCustomBoolVariable("neon.prewarm_update_ws_estimation",
"Consider prewarmed pages for working set estimation",
NULL,
&lfc_prewarm_update_ws_estimation,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -550,6 +587,32 @@ lfc_init(void)
lfc_change_chunk_size,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed chunks",
NULL,
&lfc_prewarm_limit,
INT_MAX, /* no limit by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -563,6 +626,317 @@ lfc_init(void)
#endif
}
FileCacheState*
lfc_get_state(size_t max_entries)
{
FileCacheState* fcs = NULL;
if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */
return NULL;
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
size_t i = 0;
uint8* bitmap;
size_t n_pages = 0;
size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned);
size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries);
fcs = (FileCacheState*)palloc0(state_size);
SET_VARSIZE(fcs, state_size);
fcs->magic = FILE_CACHE_STATE_MAGIC;
fcs->chunk_size_log = lfc_chunk_size_log;
fcs->n_chunks = n_entries;
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
fcs->chunks[i] = entry->key;
for (int j = 0; j < lfc_blocks_per_chunk; j++)
{
if (GET_STATE(entry, j) != UNAVAILABLE)
{
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
n_pages += 1;
}
}
if (++i == n_entries)
break;
}
Assert(i == n_entries);
fcs->n_pages = n_pages;
Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages);
elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages);
}
LWLockRelease(lfc_lock);
return fcs;
}
/*
* Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock
* and avoid race conditions with other backends.
*/
void
lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
{
size_t fcs_chunk_size_log;
size_t n_entries;
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
size_t fcs_size;
dsm_segment *seg;
BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS];
if (!lfc_ensure_opened())
return;
if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
if (n_workers > MAX_PREWARM_WORKERS)
{
elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS);
}
if (fcs == NULL || fcs->n_chunks == 0)
{
elog(LOG, "LFC: nothing to prewarm");
return;
}
if (fcs->magic != FILE_CACHE_STATE_MAGIC)
{
elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic);
}
fcs_size = VARSIZE(fcs);
if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size)
{
elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs));
}
fcs_chunk_size_log = fcs->chunk_size_log;
if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG)
{
elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log);
}
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
Assert(n_entries != 0);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
elog(LOG, "LFC: skip prewarm because LFC is already filled");
LWLockRelease(lfc_lock);
return;
}
if (lfc_ctl->prewarm_active)
{
LWLockRelease(lfc_lock);
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
}
lfc_ctl->n_prewarm_entries = n_entries;
lfc_ctl->n_prewarm_workers = n_workers;
lfc_ctl->prewarm_active = true;
lfc_ctl->prewarm_canceled = false;
lfc_ctl->prewarm_batch = prewarm_batch;
memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState));
LWLockRelease(lfc_lock);
/* Calculate total number of pages to be prewarmed */
lfc_ctl->total_prewarm_pages = fcs->n_pages;
seg = dsm_create(fcs_size, 0);
memcpy(dsm_segment_address(seg), fcs, fcs_size);
lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg);
/* Spawn background workers */
for (uint32 i = 0; i < n_workers; i++)
{
BackgroundWorker worker = {0};
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy(worker.bgw_library_name, "neon");
strcpy(worker.bgw_function_name, "lfc_prewarm_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1);
strcpy(worker.bgw_type, "LFC prewarm worker");
worker.bgw_main_arg = Int32GetDatum(i);
/* must set notify PID to wait for shutdown */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i]))
{
ereport(LOG,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("LFC: registering dynamic bgworker prewarm failed"),
errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
n_workers = i;
lfc_ctl->prewarm_canceled = true;
break;
}
}
for (uint32 i = 0; i < n_workers; i++)
{
bool interrupted;
do
{
interrupted = false;
PG_TRY();
{
BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]);
if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED)
{
elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status);
}
}
PG_CATCH();
{
elog(LOG, "LFC: cancel prewarm");
lfc_ctl->prewarm_canceled = true;
interrupted = true;
}
PG_END_TRY();
} while (interrupted);
if (!lfc_ctl->prewarm_workers[i].completed)
{
/* Background worker doesn't set completion time: it means that it was abnormally terminated */
elog(LOG, "LFC: prewarm worker %d failed", i+1);
/* Set completion time to prevent get_prewarm_info from considering this worker as active */
lfc_ctl->prewarm_workers[i].completed = GetCurrentTimestamp();
}
}
dsm_detach(seg);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_ctl->prewarm_active = false;
LWLockRelease(lfc_lock);
}
void
lfc_prewarm_main(Datum main_arg)
{
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
size_t fcs_chunk_size_log;
size_t max_prefetch_pages;
size_t prewarm_batch;
size_t n_workers;
dsm_segment *seg;
FileCacheState* fcs;
uint8* bitmap;
BufferTag tag;
PrewarmWorkerState* ws;
uint32 worker_id = DatumGetInt32(main_arg);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle);
if (seg == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not map dynamic shared memory segment")));
fcs = (FileCacheState*) dsm_segment_address(seg);
prewarm_batch = lfc_ctl->prewarm_batch;
fcs_chunk_size_log = fcs->chunk_size_log;
n_workers = lfc_ctl->n_prewarm_workers;
max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log;
ws = &lfc_ctl->prewarm_workers[worker_id];
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
/* enable prefetch in LFC */
lfc_store_prefetch_result = true;
lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */
elog(LOG, "LFC: worker %d start prewarming", worker_id);
while (!lfc_ctl->prewarm_canceled)
{
if (snd_idx < max_prefetch_pages)
{
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* If there are multiple workers, split chunks between them */
snd_idx += 1 << fcs_chunk_size_log;
}
else
{
if (BITMAP_ISSET(bitmap, snd_idx))
{
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
{
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
n_sent += 1;
}
else
{
ws->skipped_pages += 1;
BITMAP_CLR(bitmap, snd_idx);
}
}
snd_idx += 1;
}
}
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages)
{
if (n_received == n_sent && snd_idx == max_prefetch_pages)
{
break;
}
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* Skip chunks processed by other workers */
rcv_idx += 1 << fcs_chunk_size_log;
continue;
}
/* Locate next block to prefetch */
while (!BITMAP_ISSET(bitmap, rcv_idx))
{
rcv_idx += 1;
}
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
if (communicator_prefetch_receive(tag))
{
ws->prewarmed_pages += 1;
}
else
{
ws->skipped_pages += 1;
}
rcv_idx += 1;
n_received += 1;
}
}
/* No need to perform prefetch cleanup here because prewarm worker will be terminated and
* connection to PS dropped just after return from this function.
*/
Assert(n_sent == n_received || lfc_ctl->prewarm_canceled);
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -1001,8 +1375,11 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
* If we can't (e.g. because all other slots are being accessed)
* then we will remove this entry from the hash and continue
* on to the next chunk, as we may not exceed the limit.
*
* While prewarming LFC we do not want to replace existed entries,
* so we just stop prewarm is LFC cache is full.
*/
else if (!dlist_is_empty(&lfc_ctl->lru))
else if (!dlist_is_empty(&lfc_ctl->lru) && !lfc_do_prewarm)
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
@@ -1026,6 +1403,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
/* Can't add this chunk - we don't have the space for it */
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
HASH_REMOVE, NULL);
lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */
return false;
}
@@ -1112,9 +1490,11 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
state = GET_STATE(entry, chunk_offs);
@@ -1748,3 +2128,82 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheState* fcs = lfc_get_state(max_entries);
if (fcs != NULL)
PG_RETURN_BYTEA_P((bytea*)fcs);
else
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(prewarm_local_cache);
Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_workers = PG_GETARG_INT32(1);
FileCacheState* fcs = (FileCacheState*)state;
lfc_prewarm(fcs, n_workers);
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_prewarm_info);
Datum
get_prewarm_info(PG_FUNCTION_ARGS)
{
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
uint32 prewarmed_pages = 0;
uint32 skipped_pages = 0;
uint32 active_workers = 0;
uint32 total_pages;
size_t n_workers;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
LWLockAcquire(lfc_lock, LW_SHARED);
if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0)
{
LWLockRelease(lfc_lock);
PG_RETURN_NULL();
}
n_workers = lfc_ctl->n_prewarm_workers;
total_pages = lfc_ctl->total_prewarm_pages;
for (size_t i = 0; i < n_workers; i++)
{
PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i];
prewarmed_pages += ws->prewarmed_pages;
skipped_pages += ws->skipped_pages;
active_workers += ws->completed != 0;
}
LWLockRelease(lfc_lock);
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(total_pages);
values[1] = Int32GetDatum(prewarmed_pages);
values[2] = Int32GetDatum(skipped_pages);
values[3] = Int32GetDatum(active_workers);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -13,6 +13,17 @@
#include "neon_pgversioncompat.h"
typedef struct FileCacheState
{
int32 vl_len_; /* varlena header (do not touch directly!) */
uint32 magic;
uint32 n_chunks;
uint32 n_pages;
uint16 chunk_size_log;
BufferTag chunks[FLEXIBLE_ARRAY_MEMBER];
/* followed by bitmap */
} FileCacheState;
/* GUCs */
extern bool lfc_store_prefetch_result;
@@ -32,7 +43,10 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
extern FileCacheState* lfc_get_state(size_t max_entries);
extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers);
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -26,6 +26,7 @@
#include "portability/instr_time.h"
#include "postmaster/interrupt.h"
#include "storage/buf_internals.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
@@ -79,6 +80,7 @@ int neon_protocol_version = 3;
static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
static int max_sockets;
static int pageserver_response_log_timeout = 10000;
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
@@ -336,6 +338,13 @@ load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p)
pageserver_disconnect(i);
}
pagestore_local_counter = end_update_counter;
/* Reserve file descriptors for sockets */
while (max_sockets < num_shards)
{
max_sockets += 1;
ReserveExternalFD();
}
}
if (num_shards_p)
@@ -736,8 +745,8 @@ pageserver_connect(shardno_t shard_no, int elevel)
default:
neon_shard_log(shard_no, ERROR, "libpagestore: invalid connection state %d", shard->state);
}
/* This shouldn't be hit */
Assert(false);
pg_unreachable();
}
static void
@@ -877,6 +886,7 @@ retry:
int port;
int sndbuf;
int recvbuf;
uint64* max_wait;
get_local_port(PQsocket(pageserver_conn), &port);
get_socket_stats(PQsocket(pageserver_conn), &sndbuf, &recvbuf);
@@ -887,7 +897,10 @@ retry:
shard->nrequests_sent, shard->nresponses_received, port, sndbuf, recvbuf,
pageserver_conn->inStart, pageserver_conn->inEnd);
shard->receive_last_log_time = now;
MyNeonCounters->compute_getpage_stuck_requests_total += !shard->receive_logged;
shard->receive_logged = true;
max_wait = &MyNeonCounters->compute_getpage_max_inflight_stuck_time_ms;
*max_wait = Max(*max_wait, INSTR_TIME_GET_MILLISEC(since_start));
}
/*
@@ -910,6 +923,7 @@ retry:
get_local_port(PQsocket(pageserver_conn), &port);
neon_shard_log(shard_no, LOG, "no response from pageserver for %0.3f s, disconnecting (socket port=%d)",
INSTR_TIME_GET_DOUBLE(since_start), port);
MyNeonCounters->compute_getpage_max_inflight_stuck_time_ms = 0;
pageserver_disconnect(shard_no);
return -1;
}
@@ -933,6 +947,7 @@ retry:
INSTR_TIME_SET_ZERO(shard->receive_start_time);
INSTR_TIME_SET_ZERO(shard->receive_last_log_time);
shard->receive_logged = false;
MyNeonCounters->compute_getpage_max_inflight_stuck_time_ms = 0;
return ret;
}

View File

@@ -0,0 +1,22 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
PARALLEL SAFE;
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea, n_workers integer default 1)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -0,0 +1,7 @@
DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);

View File

@@ -4,6 +4,7 @@
#include "miscadmin.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "storage/ipc.h"
#include "storage/shmem.h"
#include "storage/buf_internals.h"
@@ -396,9 +397,10 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
XLogRecPtr
neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks)
{
if (lsn < FirstNormalUnloggedLSN || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
return lsn;
Assert(lsn >= WalSegMinSize);
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
lsn = SetLastWrittenLSNForBlockRangeInternal(lsn, rlocator, forknum, from, n_blocks);
LWLockRelease(LastWrittenLsnLock);
@@ -435,7 +437,6 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
NInfoGetRelNumber(relfilenode) == InvalidOid)
return InvalidXLogRecPtr;
BufTagInit(key, relNumber, forknum, blockno, spcOid, dbOid);
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
@@ -444,6 +445,10 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
{
XLogRecPtr lsn = lsns[i];
if (lsn == InvalidXLogRecPtr)
continue;
Assert(lsn >= WalSegMinSize);
key.blockNum = blockno + i;
entry = hash_search(lastWrittenLsnCache, &key, HASH_ENTER, &found);
if (found)

View File

@@ -148,7 +148,7 @@ histogram_to_metrics(IOHistogram histogram,
static metric_t *
neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
{
#define NUM_METRICS ((2 + NUM_IO_WAIT_BUCKETS) * 3 + 10)
#define NUM_METRICS ((2 + NUM_IO_WAIT_BUCKETS) * 3 + 12)
metric_t *metrics = palloc((NUM_METRICS + 1) * sizeof(metric_t));
int i = 0;
@@ -166,6 +166,8 @@ neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
APPEND_METRIC(getpage_prefetch_requests_total);
APPEND_METRIC(getpage_sync_requests_total);
APPEND_METRIC(compute_getpage_stuck_requests_total);
APPEND_METRIC(compute_getpage_max_inflight_stuck_time_ms);
APPEND_METRIC(getpage_prefetch_misses_total);
APPEND_METRIC(getpage_prefetch_discards_total);
APPEND_METRIC(pageserver_requests_sent_total);
@@ -294,6 +296,11 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
totals.file_cache_hits_total += counters->file_cache_hits_total;
histogram_merge_into(&totals.file_cache_read_hist, &counters->file_cache_read_hist);
histogram_merge_into(&totals.file_cache_write_hist, &counters->file_cache_write_hist);
totals.compute_getpage_stuck_requests_total += counters->compute_getpage_stuck_requests_total;
totals.compute_getpage_max_inflight_stuck_time_ms = Max(
totals.compute_getpage_max_inflight_stuck_time_ms,
counters->compute_getpage_max_inflight_stuck_time_ms);
}
metrics = neon_perf_counters_to_metrics(&totals);

View File

@@ -57,6 +57,18 @@ typedef struct
uint64 getpage_prefetch_requests_total;
uint64 getpage_sync_requests_total;
/*
* Total number of Getpage requests left without an answer for more than
* pageserver_response_log_timeout but less than pageserver_response_disconnect_timeout
*/
uint64 compute_getpage_stuck_requests_total;
/*
* Longest waiting time for active stuck requests. If a stuck request gets a
* response or disconnects, this metric is updated
*/
uint64 compute_getpage_max_inflight_stuck_time_ms;
/*
* Total number of readahead misses; consisting of either prefetches that
* don't satisfy the LSN bounds, or cases where no readahead was issued

View File

@@ -836,7 +836,7 @@ TermsCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInf
{
uint32 n_greeted = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
for (uint32 i = 0; i < mset->len; i++)
{
Safekeeper *sk = msk[i];
@@ -1106,7 +1106,7 @@ VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInf
{
uint32 n_votes = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
for (uint32 i = 0; i < mset->len; i++)
{
Safekeeper *sk = msk[i];

View File

@@ -63,7 +63,7 @@
char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
int safekeeper_proto_version = 2;
int safekeeper_proto_version = 3;
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
@@ -228,7 +228,7 @@ nwp_register_gucs(void)
"Version of compute <-> safekeeper protocol.",
"Used while migrating from 2 to 3.",
&safekeeper_proto_version,
2, 0, INT_MAX,
3, 0, INT_MAX,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);

View File

@@ -32,7 +32,7 @@ To play with it locally one may start proxy over a local postgres installation
(see end of this page on how to generate certs with openssl):
```
./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444
LOGFMT=text ./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444
```
If both postgres and proxy are running you may send a SQL query:
@@ -130,7 +130,7 @@ openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key
Then we need to build proxy with 'testing' feature and run, e.g.:
```sh
RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key
RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key
```
Now from client you can start a new session:

View File

@@ -132,11 +132,10 @@ impl Drop for LoggingGuard {
}
}
// TODO: make JSON the default
#[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
enum LogFormat {
Text,
#[default]
Text = 1,
Json,
}

View File

@@ -1,7 +1,6 @@
//
// Main entry point for the safekeeper executable
//
use std::env::{VarError, var};
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::str::FromStr;
@@ -354,29 +353,13 @@ async fn main() -> anyhow::Result<()> {
};
// Load JWT auth token to connect to other safekeepers for pull_timeline.
// First check if the env var is present, then check the arg with the path.
// We want to deprecate and remove the env var method in the future.
let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
Ok(v) => {
info!("loaded JWT token for authentication with safekeepers");
Some(SecretString::from(v))
}
Err(VarError::NotPresent) => {
if let Some(auth_token_path) = args.auth_token_path.as_ref() {
info!(
"loading JWT token for authentication with safekeepers from {auth_token_path}"
);
let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
Some(SecretString::from(auth_token.trim().to_owned()))
} else {
info!("no JWT token for authentication with safekeepers detected");
None
}
}
Err(_) => {
warn!("JWT token for authentication with safekeepers is not unicode");
None
}
let sk_auth_token = if let Some(auth_token_path) = args.auth_token_path.as_ref() {
info!("loading JWT token for authentication with safekeepers from {auth_token_path}");
let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
Some(SecretString::from(auth_token.trim().to_owned()))
} else {
info!("no JWT token for authentication with safekeepers detected");
None
};
let ssl_ca_certs = match args.ssl_ca_file.as_ref() {

View File

@@ -401,7 +401,10 @@ pub async fn handle_request(
request.timeline_id,
));
if existing_tli.is_ok() {
bail!("Timeline {} already exists", request.timeline_id);
info!("Timeline {} already exists", request.timeline_id);
return Ok(PullTimelineResponse {
safekeeper_host: None,
});
}
let mut http_client = reqwest::Client::builder();
@@ -425,8 +428,25 @@ pub async fn handle_request(
let mut statuses = Vec::new();
for (i, response) in responses.into_iter().enumerate() {
let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
statuses.push((status, i));
match response {
Ok(status) => {
statuses.push((status, i));
}
Err(e) => {
info!("error fetching status from {}: {e}", http_hosts[i]);
}
}
}
// Allow missing responses from up to one safekeeper (say due to downtime)
// e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
let min_required_successful = (http_hosts.len() - 1).max(1);
if statuses.len() < min_required_successful {
bail!(
"only got {} successful status responses. required: {min_required_successful}",
statuses.len()
)
}
// Find the most advanced safekeeper
@@ -536,6 +556,6 @@ async fn pull_timeline(
.await?;
Ok(PullTimelineResponse {
safekeeper_host: host,
safekeeper_host: Some(host),
})
}

View File

@@ -32,7 +32,7 @@ use crate::metrics::{
WAL_RECEIVERS,
};
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
use crate::timeline::WalResidentTimeline;
use crate::timeline::{TimelineError, WalResidentTimeline};
const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
@@ -357,9 +357,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
.await
.context("create timeline")?
} else {
self.global_timelines
.get(self.ttid)
.context("get timeline")?
let timeline_res = self.global_timelines.get(self.ttid);
match timeline_res {
Ok(tl) => tl,
Err(TimelineError::NotFound(_)) => {
return Err(CopyStreamHandlerEnd::TimelineNoCreate);
}
other => other.context("get_timeline")?,
}
};
tli.wal_residence_guard().await?
}

View File

@@ -19,7 +19,8 @@ use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::{
Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, Service,
PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT, Service,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
@@ -132,6 +133,10 @@ struct Cli {
#[arg(long)]
priority_reconciler_concurrency: Option<usize>,
/// Maximum number of safekeeper reconciliations that may run in parallel (per safekeeper)
#[arg(long)]
safekeeper_reconciler_concurrency: Option<usize>,
/// Tenant API rate limit, as requests per second per tenant.
#[arg(long, default_value = "10")]
tenant_rate_limit: NonZeroU32,
@@ -403,6 +408,9 @@ async fn async_main() -> anyhow::Result<()> {
priority_reconciler_concurrency: args
.priority_reconciler_concurrency
.unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT),
safekeeper_reconciler_concurrency: args
.safekeeper_reconciler_concurrency
.unwrap_or(SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT),
tenant_rate_limit: args.tenant_rate_limit,
split_threshold: args.split_threshold,
max_split_shards: args.max_split_shards,

View File

@@ -194,6 +194,7 @@ pub(crate) enum LeadershipStatus {
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
@@ -382,6 +383,9 @@ pub struct Config {
/// How many high-priority Reconcilers may be spawned concurrently
pub priority_reconciler_concurrency: usize,
/// How many safekeeper reconciles may happen concurrently (per safekeeper)
pub safekeeper_reconciler_concurrency: usize,
/// How many API requests per second to allow per tenant, across all
/// tenant-scoped API endpoints. Further API requests queue until ready.
pub tenant_rate_limit: NonZeroU32,
@@ -3659,7 +3663,7 @@ impl Service {
locations: ShardMutationLocations,
http_client: reqwest::Client,
jwt: Option<String>,
create_req: TimelineCreateRequest,
mut create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
let latest = locations.latest.node;
@@ -3678,6 +3682,15 @@ impl Service {
.await
.map_err(|e| passthrough_api_error(&latest, e))?;
// If we are going to create the timeline on some stale locations for shard 0, then ask them to re-use
// the initdb generated by the latest location, rather than generating their own. This avoids racing uploads
// of initdb to S3 which might not be binary-identical if different pageservers have different postgres binaries.
if tenant_shard_id.is_shard_zero() {
if let models::TimelineCreateRequestMode::Bootstrap { existing_initdb_timeline_id, .. } = &mut create_req.mode {
*existing_initdb_timeline_id = Some(create_req.new_timeline_id);
}
}
// We propagate timeline creations to all attached locations such that a compute
// for the new timeline is able to start regardless of the current state of the
// tenant shard reconciliation.
@@ -3720,6 +3733,10 @@ impl Service {
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
// use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
// that will get the first creation request, and propagate the LSN to all the >0 shards.
//
// This also enables non-zero shards to use the initdb that shard 0 generated and uploaded to S3, rather than
// independently generating their own initdb. This guarantees that shards cannot end up with different initial
// states if e.g. they have different postgres binary versions.
let timeline_info = create_one(
shard_zero_tid,
shard_zero_locations,
@@ -3729,11 +3746,16 @@ impl Service {
)
.await?;
// Propagate the LSN that shard zero picked, if caller didn't provide one
// Update the create request for shards >= 0
match &mut create_req.mode {
models::TimelineCreateRequestMode::Branch { ancestor_start_lsn, .. } if ancestor_start_lsn.is_none() => {
// Propagate the LSN that shard zero picked, if caller didn't provide one
*ancestor_start_lsn = timeline_info.ancestor_lsn;
},
models::TimelineCreateRequestMode::Bootstrap { existing_initdb_timeline_id, .. } => {
// For shards >= 0, do not run initdb: use the one that shard 0 uploaded to S3
*existing_initdb_timeline_id = Some(create_req.new_timeline_id)
}
_ => {}
}

View File

@@ -3,7 +3,10 @@ use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use clashmap::{ClashMap, Entry};
use safekeeper_api::models::PullTimelineRequest;
use safekeeper_client::mgmt_api;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::{
Semaphore,
mpsc::{self, UnboundedReceiver, UnboundedSender},
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
@@ -206,18 +209,27 @@ impl ReconcilerHandle {
}
pub(crate) struct SafekeeperReconciler {
service: Arc<Service>,
inner: SafekeeperReconcilerInner,
concurrency_limiter: Arc<Semaphore>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
cancel: CancellationToken,
}
/// Thin wrapper over `Service` to not clutter its inherent functions
#[derive(Clone)]
struct SafekeeperReconcilerInner {
service: Arc<Service>,
}
impl SafekeeperReconciler {
fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
// We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
let (tx, rx) = mpsc::unbounded_channel();
let concurrency = service.config.safekeeper_reconciler_concurrency;
let mut reconciler = SafekeeperReconciler {
service,
inner: SafekeeperReconcilerInner { service },
rx,
concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
cancel: cancel.clone(),
};
let handle = ReconcilerHandle {
@@ -230,31 +242,44 @@ impl SafekeeperReconciler {
}
async fn run(&mut self) {
loop {
// TODO add parallelism with semaphore here
let req = tokio::select! {
req = self.rx.recv() => req,
_ = self.cancel.cancelled() => break,
};
let Some((req, req_cancel)) = req else { break };
let permit_res = tokio::select! {
req = self.concurrency_limiter.clone().acquire_owned() => req,
_ = self.cancel.cancelled() => break,
};
let Ok(_permit) = permit_res else { return };
let inner = self.inner.clone();
if req_cancel.is_cancelled() {
continue;
}
let kind = req.kind;
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
self.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
%tenant_id,
?timeline_id,
%node_id,
))
.await;
tokio::task::spawn(async move {
let kind = req.kind;
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
inner
.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
%tenant_id,
?timeline_id,
%node_id,
))
.await;
});
}
}
}
impl SafekeeperReconcilerInner {
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
let req_host = req.safekeeper.skp.host.clone();
match req.kind {
@@ -281,10 +306,11 @@ impl SafekeeperReconciler {
req,
async |client| client.pull_timeline(&pull_req).await,
|resp| {
tracing::info!(
"pulled timeline from {} onto {req_host}",
resp.safekeeper_host,
);
if let Some(host) = resp.safekeeper_host {
tracing::info!("pulled timeline from {host} onto {req_host}");
} else {
tracing::info!("timeline already present on safekeeper on {req_host}");
}
},
req_cancel,
)

View File

@@ -1194,8 +1194,7 @@ class NeonEnv:
else:
cfg["broker"]["listen_addr"] = self.broker.listen_addr()
if self.control_plane_api is not None:
cfg["control_plane_api"] = self.control_plane_api
cfg["control_plane_api"] = self.control_plane_api
if self.control_plane_hooks_api is not None:
cfg["control_plane_hooks_api"] = self.control_plane_hooks_api
@@ -1280,7 +1279,8 @@ class NeonEnv:
)
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config["rel_size_v2_enabled"] = True # Enable relsize_v2 by default in tests
# This feature is pending rollout.
# tenant_config["rel_size_v2_enabled"] = True
if self.pageserver_remote_storage is not None:
ps_cfg["remote_storage"] = remote_storage_to_toml_dict(

View File

@@ -186,7 +186,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"type": "interpreted",
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
},
"rel_size_v2_enabled": False, # test suite enables it by default as of https://github.com/neondatabase/neon/issues/11081, so, custom config means disabling it
"rel_size_v2_enabled": True,
"gc_compaction_enabled": True,
"gc_compaction_verification": False,
"gc_compaction_initial_threshold_kb": 1024000,

View File

@@ -229,7 +229,7 @@ def test_pageserver_gc_compaction_preempt(
@skip_in_debug_build("only run with release build")
@pytest.mark.timeout(600) # This test is slow with sanitizers enabled, especially on ARM
@pytest.mark.timeout(900) # This test is slow with sanitizers enabled, especially on ARM
@pytest.mark.parametrize(
"with_branches",
["with_branches", "no_branches"],

View File

@@ -14,7 +14,7 @@ from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.paths import BASE_DIR
from fixtures.pg_config import PgConfigKey
from fixtures.utils import subprocess_capture
from fixtures.utils import WITH_SANITIZERS, subprocess_capture
from werkzeug.wrappers.response import Response
if TYPE_CHECKING:
@@ -148,6 +148,15 @@ def test_remote_extensions(
pg_config: PgConfig,
extension: RemoteExtension,
):
if WITH_SANITIZERS and extension is RemoteExtension.WITH_LIB:
pytest.skip(
"""
For this test to work with sanitizers enabled, we would need to
compile the dummy Postgres extension with the same CFLAGS that we
compile Postgres and the neon extension with to link the sanitizers.
"""
)
# Setup a mock nginx S3 gateway which will return our test extension.
(host, port) = httpserver_listen_address
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"

View File

@@ -0,0 +1,11 @@
"""Test for detecting new flaky tests"""
import random
def test_flaky1():
assert random.random() > 0.05
def no_test_flaky2():
assert random.random() > 0.05

View File

@@ -0,0 +1,147 @@
import random
import threading
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
def check_pinned_entries(cur):
# some LFC buffer can be temporary locked by autovacuum or background writer
for _ in range(10):
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'")
n_pinned = cur.fetchall()[0][0]
if n_pinned == 0:
break
time.sleep(1)
assert n_pinned == 0
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
cur.execute("alter extension neon update to '1.6'")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
assert lfc_used_pages > 10000
assert (
prewarm_info[0] > 0
and prewarm_info[1] > 0
and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
)
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
check_pinned_entries(cur)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 10000
n_threads = 4
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute(
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)"
)
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
running = True
def workload():
conn = endpoint.connect()
cur = conn.cursor()
n_transfers = 0
while running:
src = random.randint(1, n_records)
dst = random.randint(1, n_records)
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
n_transfers += 1
log.info(f"Number of transfers: {n_transfers}")
def prewarm():
conn = endpoint.connect()
cur = conn.cursor()
n_prewarms = 0
while running:
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
cur.execute("select pg_reload_conf()")
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
n_prewarms += 1
log.info(f"Number of prewarms: {n_prewarms}")
workload_threads = []
for _ in range(n_threads):
t = threading.Thread(target=workload)
workload_threads.append(t)
t.start()
prewarm_thread = threading.Thread(target=prewarm)
prewarm_thread.start()
time.sleep(20)
running = False
for t in workload_threads:
t.join()
prewarm_thread.join()
cur.execute("select sum(balance) from accounts")
total_balance = cur.fetchall()[0][0]
assert total_balance == 0
check_pinned_entries(cur)

View File

@@ -3,7 +3,7 @@
Tests in this module exercise the pageserver's behavior around generation numbers,
as defined in docs/rfcs/025-generation-numbers.md. Briefly, the behaviors we require
of the pageserver are:
- Do not start a tenant without a generation number if control_plane_api is set
- Do not start a tenant without a generation number
- Remote objects must be suffixed with generation
- Deletions may only be executed after validating generation
- Updates to remote_consistent_lsn may only be made visible after validating generation

View File

@@ -506,7 +506,6 @@ class SyntheticSizeVerifier:
PER_METRIC_VERIFIERS = {
"remote_storage_size": CannotVerifyAnything,
"resident_size": CannotVerifyAnything,
"written_size": WrittenDataVerifier,
"written_data_bytes_delta": WrittenDataDeltaVerifier,
"timeline_logical_size": CannotVerifyAnything,

View File

@@ -471,7 +471,7 @@ def test_tx_abort_with_many_relations(
try:
# Rollback phase should be fast: this is one WAL record that we should process efficiently
fut = exec.submit(rollback_and_wait)
fut.result(timeout=15)
fut.result(timeout=15 if reldir_type == "v1" else 30)
except:
exec.shutdown(wait=False, cancel_futures=True)
raise

View File

@@ -11,6 +11,9 @@ if TYPE_CHECKING:
# Test that pageserver and safekeeper can restart quickly.
# This is a regression test, see https://github.com/neondatabase/neon/issues/2247
def test_fixture_restart(neon_env_builder: NeonEnvBuilder):
import random
assert random.random() > 0.05
env = neon_env_builder.init_start()
for _ in range(3):
@@ -20,3 +23,9 @@ def test_fixture_restart(neon_env_builder: NeonEnvBuilder):
for _ in range(3):
env.safekeepers[0].stop()
env.safekeepers[0].start()
def test_flaky3():
import random
assert random.random() > 0.05