Compare commits

..

134 Commits

Author SHA1 Message Date
Alexey Masterov
6572a291cf confused the files 2025-05-14 11:39:31 +02:00
Alexey Masterov
ae3b0bf963 do not run on pgv16 2025-05-14 11:22:56 +02:00
Alexey Masterov
c79c7e4314 change region 2025-05-13 15:57:42 +02:00
Alexey Masterov
6a7f49e61f do not use pooled 2025-05-13 15:50:01 +02:00
Alexey Masterov
8e25ddec54 Add mask 2025-05-13 15:44:43 +02:00
Alexey Masterov
6b4d3ca9c1 add -r 2025-05-13 15:35:40 +02:00
Alexey Masterov
32ab6a617c use another project, fail if cannot ger connect_uri 2025-05-13 15:29:00 +02:00
Alexey Masterov
4d930029c7 skip jq 2025-05-13 15:19:36 +02:00
Alexey Masterov
b264422779 Add debug 2025-05-13 15:11:48 +02:00
Alexey Masterov
1649030340 Add debug 2025-05-13 15:08:47 +02:00
Alexey Masterov
8d00865923 Add fail 2025-05-13 14:58:48 +02:00
Alexey Masterov
0a73cb0238 use uri from project 2025-05-13 14:33:54 +02:00
Alexey Masterov
22403105b3 force x64 2025-05-09 16:07:31 +02:00
Alexey Masterov
9e25f1aad4 force x64 2025-05-09 15:45:27 +02:00
Alexey Masterov
ce9f6f7146 exclude plpgsql_check-src 2025-05-09 14:25:55 +02:00
Alexey Masterov
dc98dc6dc2 change image 2025-05-09 14:24:39 +02:00
Alexey Masterov
eaf8f905a2 Merge branch 'refs/heads/amasterov/add-postgis-test-2' into amasterov/run-postgis-test 2025-05-09 13:40:37 +02:00
Alexey Masterov
2774687a08 print postgis diffs, if found 2025-05-09 13:40:15 +02:00
Alexey Masterov
deac7ae80f print postgis diffs, if found 2025-05-09 13:38:26 +02:00
Alexey Masterov
8e63c8c035 change the image 2025-05-09 12:57:46 +02:00
Alexey Masterov
7b3cc75bd3 Merge branch 'refs/heads/amasterov/add-postgis-test-2' into amasterov/run-postgis-test 2025-05-09 12:00:53 +02:00
Alexey Masterov
27f23d8e6f add a patch for v16 2025-05-09 12:00:29 +02:00
Alexey Masterov
a4c9f4483e change runs-on 2025-05-09 11:50:18 +02:00
Alexey Masterov
a2af701010 Prepare to run on staging 2025-05-09 10:53:28 +02:00
a-masterov
e2fcfb9b2a Merge branch 'main' into amasterov/add-postgis-test-2 2025-05-09 08:48:21 +02:00
Christian Schwarz
b37bb7d7ed pageserver: timeline shutdown: fully quiesce ingest path beforefreeze_and_flush (#11851)
# Problem 

Before this PR, timeline shutdown would
- cancel the walreceiver cancellation token subtree (child token of
Timeline::cancel)
- call freeze_and_flush
- Timeline::cancel.cancel()
- ... bunch of waiting for things ...
- Timeline::gate.close()

As noted by the comment that is deleted by this PR, this left a window
where, after freeze_and_flush, walreceiver could still be running and
ingest data into a new InMemoryLayer.

This presents a potential source of log noise during Timeline shutdown
where the InMemoryLayer created after the freeze_and_flush observes
that Timeline::cancel is cancelled, failing the ingest with some
anyhow::Error wrapping (deeply) a `FlushTaskError::Cancelled` instance
(`flush task cancelled` error message).

# Solution

It turns out that it is quite easy to shut down, not just cancel,
walreceiver completely
because the only subtask spawned by walreceiver connection manager is
the `handle_walreceiver_connection` task, which is properly shut down
and waited upon when the manager task observes cancellation and exits
its retry loop.

The alternative is to replace all the usage of `anyhow` on the ingest
path
with differentiated error types. A lot of busywork for little gain to
fix
a potential logging noise nuisance, so, not doing that for now.

# Correctness / Risk

We do not risk leaking walreceiver child tasks because existing
discipline
is to hold a gate guard.

We will prolong `Timeline::shutdown` to the degree that we're no longer
making
progress with the rest of shutdown while the walreceiver task hasn't yet
observed cancellation. In practice, this should be negligible.

`Timeline::shutdown` could fail to complete if there is a hidden
dependency
of walreceiver shutdown on some subsystem. The code certainly suggests
there
isn't, and I'm not aware of any such dependency. Anyway, impact will be
low
because we only shut down Timeline instances that are obsolete, either
because
there is a newer attachment at a different location, or because the
timeline
got deleted by the user. We would learn about this through stuck cplane
operations or stuck storcon reconciliations. We would be able to
mitigate by
cancelling such stuck operations/reconciliations and/or by rolling back
pageserver.

# Refs
- identified this while investigating
https://github.com/neondatabase/neon/issues/11762
- PR that _does_ fix a bunch _real_ `flush task cancelled` noise on the
compaction path: https://github.com/neondatabase/neon/pull/11853
2025-05-08 18:48:24 +00:00
Conrad Ludgate
bef5954fd7 feat(proxy): track SNI usage by protocol, including for http (#11863)
## Problem

We want to see how many users of the legacy serverless driver are still
using the old URL for SQL-over-HTTP traffic.

## Summary of changes

Adds a protocol field to the connections_by_sni metric. Ensures it's
incremented for sql-over-http.
2025-05-08 16:46:57 +00:00
Christian Schwarz
8477d15f95 feat(direct IO): remove special case in test suite for compat tests (#11864)
PR
- https://github.com/neondatabase/neon/pull/11558
adds special treatment for compat snapshot binaries which don't
understand the `direct-rw` mode.

A new compat snapshot has been published since, so,
we can remove the special case.

refs:
- fixes https://github.com/neondatabase/neon/issues/11598
2025-05-08 16:11:45 +00:00
Arpad Müller
622b3b2993 Fixes for enabling --timelines-onto-safekeepers in tests (#11854)
Second PR with fixes extracted from #11712, relating to
`--timelines-onto-safekeepers`. Does the following:

* Moves safekeeper registration to `neon_local` instead of the test
fixtures
* Pass safekeeper JWT token if `--timelines-onto-safekeepers` is enabled
* Allow some warnings related to offline safekeepers (similarly to how
we allow them for offline pageservers)
* Enable generations on the compute's config if
`--timelines-onto-safekeepers` is enabled
* fix parallel `pull_timeline` race condition (the one that #11786 put
for later)

Fixes #11424
Part of #11670
2025-05-08 15:13:11 +00:00
Alexey Masterov
bd8428cb28 Add a workaround for pgv16 2025-05-08 16:35:54 +02:00
Santosh Pingale
659366060d Reuse remote_client from the SnapshotDownloader instead of recreating in download function (#11812)
## Problem
At the moment, remote_client and target are recreated in download
function. We could reuse it from SnapshotDownloader instance. This isn't
a problem per se, just a quality of life improvement but it caught my
attention when we were trying out snapshot downloading in one of the
older version and ran into a curious case of s3 clients behaving in two
different manners. One client that used `force_path_style` and other one
didn't.

**Logs from this run:**
```
2025-05-02T12:56:22.384626Z DEBUG /data/snappie/2739e7da34e625e3934ef0b76fa12483/timelines/d44b831adb0a6ba96792dc3a5cc30910/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014E8F20-00000000014E8F99-00000001 requires download...
2025-05-02T12:56:22.384689Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:apply_configuration: timeout settings for this operation: TimeoutConfig { connect_timeout: Set(3.1s), read_timeout: Disabled, operation_timeout: Disabled, operation_attempt_timeout: Disabled }
2025-05-02T12:56:22.384730Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op: entering 'serialization' phase
2025-05-02T12:56:22.384784Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op: entering 'before transmit' phase
2025-05-02T12:56:22.384813Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op: retry strategy has OKed initial request
2025-05-02T12:56:22.384841Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op: beginning attempt #1
2025-05-02T12:56:22.384870Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: resolving endpoint endpoint_params=EndpointResolverParams(TypeErasedBox[!Clone]:Params { bucket: Some("bucket"), region: Some("eu-north-1"), use_fips: false, use_dual_stack: false, endpoint: Some("https://s3.self-hosted.company.com"), force_path_style: false, accelerate: false, use_global_endpoint: false, use_object_lambda_endpoint: None, key: None, prefix: Some("/pageserver/tenants/2739e7da34e625e3934ef0b76fa12483/timelines/d44b831adb0a6ba96792dc3a5cc30910/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014E8F20-00000000014E8F99-00000001"), copy_source: None, disable_access_points: None, disable_multi_region_access_points: false, use_arn_region: None, use_s3_express_control_endpoint: None, disable_s3_express_session_auth: None }) endpoint_prefix=None
2025-05-02T12:56:22.384979Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: will use endpoint Endpoint { url: "https://neon.s3.self-hosted.company.com", headers: {}, properties: {"authSchemes": Array([Object({"signingRegion": String("eu-north-1"), "disableDoubleEncoding": Bool(true), "name": String("sigv4"), "signingName": String("s3")})])} }
2025-05-02T12:56:22.385042Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt:lazy_load_identity:provide_credentials{provider=default_chain}: loaded credentials provider=Environment
2025-05-02T12:56:22.385066Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt:lazy_load_identity: identity cache miss occurred; added new identity (took 35.958µs) new_expiration=2025-05-02T13:11:22.385028Z valid_for=899.999961437s partition=IdentityCachePartition(5)
2025-05-02T12:56:22.385090Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: loaded identity
2025-05-02T12:56:22.385162Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: entering 'transmit' phase
2025-05-02T12:56:22.385211Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: new TCP connector created in 361ns
2025-05-02T12:56:22.385288Z DEBUG resolving host="neon.s3.self-hosted.company.com"
2025-05-02T12:56:22.390796Z DEBUG invoke{service=s3 operation=ListObjectVersions sdk_invocation_id=7315885}:try_op:try_attempt: encountered orchestrator error; halting
```
2025-05-08 14:09:15 +00:00
Christian Schwarz
42d93031a1 fixup(#11819): broken macOS build (#11861)
refs
- fixes https://github.com/neondatabase/neon/issues/11860
2025-05-08 11:48:29 +00:00
Mark Novikov
d22377c754 Skip event triggers in dump-restore (#11794)
## Problem

Data import fails if the src db has any event triggers, because those
can only be restored by a superuser. Specifically imports from Heroku
and Supabase are guaranteed to fail.

Closes https://github.com/neondatabase/cloud/issues/27353

## Summary of changes

Depends on `pg_dump` patches per each supported PostgreSQL version:
- https://github.com/neondatabase/postgres/pull/630
- https://github.com/neondatabase/postgres/pull/629
- https://github.com/neondatabase/postgres/pull/627
- https://github.com/neondatabase/postgres/pull/628
2025-05-08 11:04:28 +00:00
Erik Grinaker
6c70789cfd storcon: increase drain+fill secondary warmup timeout from 20 to 30 seconds (#11848)
## Problem

During deployment drains/fills, we often see the storage controller
giving up on warmups after 20 seconds, when the warmup is nearly
complete (~90%). This can cause latency spikes for migrated tenants if
they block on layer downloads.

Touches https://github.com/neondatabase/cloud/issues/26193.

## Summary of changes

Increase the drain and fill secondary warmup timeout from 20 to 30
seconds.
2025-05-08 10:14:41 +00:00
Dmitrii Kovalkov
7e55497e13 tests: flush wal before waiting for last record lsn (#11726)
## Problem
Compute may flush WAL on page boundaries, leaving some records partially
flushed for a long time.
It leads to `wait_for_last_flush_lsn` stuck waiting for this partial
LSN.
- Closes: https://github.com/neondatabase/cloud/issues/27876

## Summary of changes
- Flush WAL via CHECKPOINT after requesting current_wal_lsn to make sure
that the record we point to is flushed in full
- Use proper endpoint in
`test_timeline_detach_with_aux_files_with_detach_v1`
2025-05-08 10:00:45 +00:00
Vlad Lazar
40f32ea326 pageserver: refactor import flow and add job concurrency limiting (#11816)
## Problem

Import code is one big block. Separating planning and execution will
help with reporting
progress of import to storcon (building block for resuming import).

## Summary of changes

Split up the import into planning and execution.
A concurrency limit driven by PS config is also added.
2025-05-08 09:19:14 +00:00
Christian Schwarz
1d1502bc16 fix(pageserver): flush task cancelled errors during timeline shutdown (#11853)
# Refs
- fixes https://github.com/neondatabase/neon/issues/11762

# Problem

PR #10993 introduced internal retries for BufferedWriter flushes.
PR #11052 added cancellation sensitivity to that retry loop.
That cancellation sensitivity is an error path that didn't exist before.

The result is that during timeline shutdown, after we
`Timeline::cancel`, compaction can now fail with error `flush task
cancelled`.
The problem with that:
1. We mis-classify this as an `error!`-worthy event.
2. This causes tests to become flaky because the error is not in global
`allowed_errors`.

Technically we also trip the `compaction_circuit_breaker` because the
resulting `CompactionError` is variant `::Other`.
But since this is Timeline shutdown, is doesn't matter practically
speaking.

# Solution / Changes

- Log the anyhow stack trace when classifying a compaction error as
`error!`.
  This was helpful to identify sources of `flush task cancelled` errors.
We only log at `error!` level in exceptional circumstances, so, it's ok
to have bit verbose logs.
- Introduce typed errors along the `BufferedWriter::write_*`=>
`BlobWriter::write_blob`
=> `{Delta,Image}LayerWriter::put_*` =>
`Split{Delta,Image}LayerWriter::put_{value,image}` chain.
- Proper mapping to `CompactionError`/`CreateImageLayersError` via new
`From` impls.

I am usually opposed to any magic `From` impls, but, it's how most of
the compaction code
works today.

# Testing

The symptoms are most prevalent in
`test_runner/regress/test_branch_and_gc.py::test_branch_and_gc`.
Before this PR, I was able to reproduce locally 1 or 2 times per 400
runs using
`DEFAULT_PG_VERSION=15 BUILD_TYPE=release poetry run pytest --count 400
-n 8`.
After this PR, it doesn't reproduce anymore after 2000 runs.

# Future Work

Technically the ingest path is also exposed to this new source of errors
because `InMemoryLayer` is backed by `BufferedWriter`.
But we haven't seen it occur in flaky tests yet.
Details and a fix in
- https://github.com/neondatabase/neon/pull/11851
2025-05-08 06:57:53 +00:00
Christian Schwarz
7eb85c56ac tokio-epoll-uring: avoid warn! noise due to ECANCELED during shutdowns (#11819)
# Problem

Before this PR, `test_pageserver_catchup_while_compute_down` would
occasionally fail due to scary-looking WARN log line

```
WARN ephemeral_file_buffered_writer{...}:flush_attempt{attempt=1}: \
 error flushing buffered writer buffer to disk, retrying after backoff err=Operation canceled (os error 125)
```

After lengthy investigation, the conclusion is that this is likely due
to a kernel bug related due to io_uring async workers (io-wq) and
signals.
The main indicator is that the error only ever happens in correlation
with pageserver shtudown when SIGTERM is received.
There is a fix that is merged in 6.14
kernels (`io-wq: backoff when retrying worker creation`).
However, even when I revert that patch, the issue is not reproducible
on 6.14, so, it remains a speculation.

It was ruled out that the ECANCELED is due to the executor thread
exiting before the async worker starts processing the operation.

# Solution

The workaround in this issue is to retry the operation on ECANCELED
once.
Retries are safe because the low-level io_engine operations are
idempotent.
(We don't use O_APPEND and I can't think of another flag that would make
 the APIs covered by this patch not idempotent.)

# Testing

With this PR, the warn! log no longer happens on [my reproducer
setup](https://github.com/neondatabase/neon/issues/11446#issuecomment-2843015111).
And the new rate-limited `info!`-level log line informing about the
internal retry shows up instead, as expected.

# Refs
- fixes https://github.com/neondatabase/neon/issues/11446
2025-05-08 06:33:29 +00:00
Dmitrii Kovalkov
24d62c647f storcon: add missing switch_timeline_membership method to sk client (#11850)
## Problem

`switch_timeline_membership` is implemented on safekeeper's server side,
but the is missing in the client.

- Part of https://github.com/neondatabase/neon/issues/11823

## Summary of changes
- Add `switch_timeline_membership` method to `SafekeeperClient`
2025-05-07 17:00:41 +00:00
Shockingly Good
4d2e4b19c3 fix(compute) Correct the PGXN s3 gateway URL. (#11796)
Corrects the postgres extension s3 gateway address to
be not just a domain name but a full base URL.

To make the code more readable, the option is renamed
to "remote_ext_base_url", while keeping the old name
also accessible by providing a clap argument alias.

Also provides a very simple and, perhaps, even redundant
unit test to confirm the logic behind parsing of the
corresponding CLI argument.

## Problem

As it is clearly stated in
https://github.com/neondatabase/cloud/issues/26005, using of the short
version of the domain name might work for now, but in the future, we
should get rid of using the `default` namespace and this is where it
will, most likely, break down.

## Summary of changes

The changes adjust the domain name of the extension s3 gateway to use
the proper base url format instead of the just domain name assuming the
"default" namespace and add a new CLI argument name for to reflect the
change and the expectance.
2025-05-07 16:34:08 +00:00
Alexey Kondratov
0691b73f53 fix(compute): Enforce cloud_admin role in compute_ctl connections (#11827)
## Problem

Users can override some configuration parameters on the DB level with
`ALTER DATABASE ... SET ...`. Some of these overrides, like `role` or
`default_transaction_read_only`, affect `compute_ctl`'s ability to
configure the DB schema properly.

## Summary of changes

Enforce `role=cloud_admin`, `statement_timeout=0`, and move
`default_transaction_read_only=off` override from control plane [1] to
`compute_ctl`. Also, enforce `search_path=public` just in case, although
we do not call any functions in user databases.

[1]:
133dd8c4db/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go (L70)

Fixes https://github.com/neondatabase/cloud/issues/28532
2025-05-07 12:14:24 +00:00
Alexey Masterov
86826669f9 Make postgis test succeed on pg v17 2025-05-07 13:41:21 +02:00
Vlad Lazar
3cf5e1386c pageserver: fix rough edges of pageserver tracing (#11842)
## Problem

There's a few rough edges around PS tracing.

## Summary of changes

* include compute request id in pageserver trace
* use the get page specific context for GET_REL_SIZE and GET_BATCH
* fix assertion in download layer trace


![image](https://github.com/user-attachments/assets/2ff6779c-7c2d-4102-8013-ada8203aa42f)
2025-05-07 10:13:26 +00:00
Alex Chi Z.
608afc3055 fix(scrubber): log download error (#11833)
## Problem

We use `head_object` to determine whether an object exists or not.
However, it does not always error due to a missing object.

## Summary of changes

Log the error so that we can have a better idea what's going on with the
scrubber errors in prod.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-07 09:21:17 +00:00
Alexey Masterov
3392122824 Create a patch in order to fix problems running without superuser privileges 2025-05-05 13:41:21 +02:00
Alexey Masterov
a594cfa1e2 Add a script to run as a regular user 2025-04-29 08:56:55 +02:00
Alexey Masterov
35d58a92ab Use patch instead of sed 2025-04-28 13:59:33 +02:00
Alexey Masterov
eb30bd4799 Change the timeout 2025-04-28 13:50:05 +02:00
Alexey Masterov
d809743687 ensure that the directory is copied, not its content 2025-04-28 12:17:54 +02:00
Alexey Masterov
f7c69b7b84 exclude the final slash 2025-04-28 11:26:07 +02:00
Alexey Masterov
08d7d8dfc3 Do not copy sfcgal-src 2025-04-28 10:39:05 +02:00
Alexey Masterov
91139a28e5 Merge branch 'refs/heads/main' into amasterov/add-postgis-test-2
# Conflicts:
#	.github/actions/neon-project-create/action.yml
2025-04-28 10:26:10 +02:00
Alexey Masterov
a82dcbbcc8 Fix the problem with the trap in the script 2025-04-28 10:01:01 +02:00
Alexey Masterov
24ccfd89ab do not remove computed_columns on v16 2025-04-28 08:00:55 +02:00
Alexey Masterov
ab5c7d4365 increase the timeout 2025-04-25 16:53:58 +02:00
Alexey Masterov
968c87ccac fix the errors 2025-04-25 15:59:57 +02:00
Alexey Masterov
756f833269 Merge branch 'refs/heads/amasterov/cloud-extensions-test' into amasterov/add-postgis-test-2
# Conflicts:
#	docker-compose/docker_compose_test.sh
2025-04-25 14:06:51 +02:00
Alexey Masterov
0eee26a083 Do not use TTY 2025-04-25 14:04:48 +02:00
Alexey Masterov
c8d8dfc765 Merge branch 'refs/heads/main' into amasterov/cloud-extensions-test
# Conflicts:
#	docker-compose/docker_compose_test.sh
2025-04-25 12:09:21 +02:00
Alexey Masterov
2727b79a8c try to move string 2025-04-25 11:12:12 +02:00
Alexey Masterov
b6afe92bd5 add a staged install 2025-04-25 10:54:41 +02:00
Alexey Masterov
127247a63c Merge branch 'refs/heads/amasterov/refresh-codestyle-docker-compose-script' into amasterov/add-postgis-test-2 2025-04-25 10:05:16 +02:00
Alexey Masterov
bcfce1af3e add a command 2025-04-25 10:05:00 +02:00
Alexey Masterov
ce2f606995 Merge branch 'refs/heads/amasterov/refresh-codestyle-docker-compose-script' into amasterov/add-postgis-test-2 2025-04-25 10:01:53 +02:00
Alexey Masterov
57637f0ed8 fix codestyle 2025-04-25 10:01:47 +02:00
Alexey Masterov
90b9e90fa5 add a command 2025-04-25 10:00:36 +02:00
Alexey Masterov
9151a29c2a Merge branch 'refs/heads/amasterov/refresh-codestyle-docker-compose-script' into amasterov/add-postgis-test-2
# Conflicts:
#	docker-compose/docker_compose_test.sh
2025-04-25 09:39:48 +02:00
Alexey Masterov
c6120a44f8 Change the comment 2025-04-25 09:28:47 +02:00
Alexey Masterov
849c8a1356 Fix the useless cat 2025-04-25 09:12:56 +02:00
Alexey Masterov
b6741457ea Refresh the codestyle 2025-04-25 09:06:53 +02:00
Alexey Masterov
f1b98f83a9 Refactor patches 2025-04-24 17:42:33 +02:00
Alexey Masterov
0d956bea71 Add more support 2025-04-24 17:02:19 +02:00
Alexey Masterov
e7bba6457d Add required libs 2025-04-24 11:26:36 +02:00
Alexey Masterov
996202d4f9 Add postgis contribs 2025-04-24 10:12:42 +02:00
Devin AI
4ff7787496 Simplify workflow descriptions in README
Co-Authored-By: alexeymasterov@neon.tech <alexeymasterov@neon.tech>
2025-04-23 12:20:47 +00:00
Devin AI
f611af797e Add detailed information about CI workflows to README
Co-Authored-By: alexeymasterov@neon.tech <alexeymasterov@neon.tech>
2025-04-23 12:11:45 +00:00
Devin AI
d1c461f529 Add information about patching extension sources to README
Co-Authored-By: alexeymasterov@neon.tech <alexeymasterov@neon.tech>
2025-04-23 12:08:02 +00:00
Devin AI
ea9d987cad Clarify pg_regress limitation for regular users in README
Co-Authored-By: alexeymasterov@neon.tech <alexeymasterov@neon.tech>
2025-04-23 12:04:11 +00:00
Devin AI
1ca11382e1 Update script name to test-upgrade.sh in README
Co-Authored-By: alexeymasterov@neon.tech <alexeymasterov@neon.tech>
2025-04-23 12:02:03 +00:00
Devin AI
3bee41c80b Update README to mention database dropping in regular-test.sh
Co-Authored-By: alexeymasterov@neon.tech <alexeymasterov@neon.tech>
2025-04-23 11:57:29 +00:00
Devin AI
bb1e7d79c2 Update README.md with correct extension addition instructions
Co-Authored-By: alexeymasterov@neon.tech <alexeymasterov@neon.tech>
2025-04-23 11:55:06 +00:00
Devin AI
29565a7ca2 Add README.md for docker-compose/ext-src directory
Co-Authored-By: alexeymasterov@neon.tech <alexeymasterov@neon.tech>
2025-04-23 11:53:19 +00:00
Alexey Masterov
b0b7ccb1ba Add a comment 2025-04-23 13:09:30 +02:00
Alexey Masterov
be51f997b7 Merge remote-tracking branch 'origin/amasterov/cloud-extensions-test' into amasterov/cloud-extensions-test 2025-04-23 12:48:23 +02:00
Alexey Masterov
0a3fc85f2c Break the long line 2025-04-23 12:48:10 +02:00
a-masterov
13d080d6d2 Update .github/workflows/cloud-extensions.yml
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2025-04-23 12:25:22 +02:00
a-masterov
67b9e0f34d Update .github/workflows/cloud-extensions.yml
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2025-04-23 12:24:51 +02:00
Alexey Masterov
f39f45164c Add a link to issue 2025-04-23 12:17:40 +02:00
Alexey Masterov
f81baf42f1 fix docker-compose 2025-04-23 12:15:23 +02:00
Alexey Masterov
1838bd5603 sfcgal too 2025-04-23 09:49:54 +02:00
Alexey Masterov
ca21d0bdcc sfcgal too 2025-04-23 09:47:29 +02:00
Alexey Masterov
3d73e5c642 Addd postgis to the test image 2025-04-23 09:42:31 +02:00
a-masterov
ffb6cb3456 Merge branch 'main' into amasterov/cloud-extensions-test 2025-04-23 08:04:32 +02:00
Alexey Masterov
f78c92ca52 Remove error printing from the main script as it is printed in the container now 2025-04-16 11:38:56 +02:00
Alexey Masterov
6a5fc86743 Cleanup 2025-04-16 11:38:44 +02:00
Alexey Masterov
8db3bf992f remove unnecessary allure report generation 2025-04-16 10:29:04 +02:00
Alexey Masterov
fef9bd9073 remove unnecessary allure report generation 2025-04-16 10:05:46 +02:00
Alexey Masterov
7dceb61d3d adapt for regular user 2025-04-16 10:04:12 +02:00
Alexey Masterov
fa94e9f625 update tag 2025-04-15 16:46:05 +02:00
a-masterov
d1b96d5c0d Merge branch 'main' into amasterov/cloud-extensions-test 2025-04-15 16:14:24 +02:00
Alexey Masterov
627c0bed85 Skip pg_semver for PGv16 2025-04-15 16:13:49 +02:00
Alexey Masterov
d1286ab935 Change Tag 2025-04-15 15:03:56 +02:00
Alexey Masterov
f2c74a64b6 Fix rag_jina_reranker_v1_tiny_en-src 2025-04-15 14:45:00 +02:00
Alexey Masterov
ae326a6df3 Fix pgtap, skip plpgsql_check 2025-04-15 13:51:26 +02:00
Alexey Masterov
02896e3d09 Change the Tag 2025-04-14 17:24:43 +02:00
Alexey Masterov
cae480d62f Fix some tests 2025-04-14 17:04:27 +02:00
Alexey Masterov
2ec7630252 change tag 2025-04-14 14:46:50 +02:00
Alexey Masterov
d904e56e80 Add default_endpoint_settings 2025-04-14 14:07:27 +02:00
Alexey Masterov
d0d49d2985 Change default_endpoint_settings 2025-04-14 13:50:51 +02:00
Alexey Masterov
a0fa73d508 Change settings 2025-04-14 13:11:09 +02:00
Alexey Masterov
9a89a8e9ee Add ADMIN API KEY 2025-04-14 10:40:08 +02:00
Alexey Masterov
6ebc6e3994 Using admin API 2025-04-14 09:59:06 +02:00
Alexey Masterov
307899408f Fix pg_graphql 2025-04-11 15:35:28 +02:00
Alexey Masterov
ecb059d5f7 Add skip 2025-04-08 14:47:19 +02:00
Alexey Masterov
d912f05e36 Add print for regression diffs 2025-04-08 13:55:26 +02:00
Alexey Masterov
8378b1c603 Fix the shellcheck warning 2025-04-07 14:48:32 +02:00
Alexey Masterov
154d215c78 Fix the shellcheck warning 2025-04-07 14:46:51 +02:00
Alexey Masterov
0b10a5336c Workflows refactoring 2025-04-07 14:35:02 +02:00
Alexey Masterov
5c36f3786d Add project settings 2025-04-04 15:28:25 +02:00
Alexey Masterov
f3e928d781 Do not use cloud-regress.yml in this branch 2025-04-04 12:44:29 +02:00
Alexey Masterov
2eaaa0495a Add tests for a cloud instance 2025-04-04 12:28:38 +02:00
Alexey Masterov
d6ed3eb557 Add some tests 2025-04-03 18:35:55 +02:00
Alexey Masterov
4b4ce9e60a Add support for BENCHMARTK_CONNSTR 2025-04-01 15:12:27 +02:00
Alexey Masterov
3f775d2df1 Change the script 2025-04-01 12:06:04 +02:00
Alexey Masterov
d6de25f85f add an env var 2025-04-01 11:52:33 +02:00
Alexey Masterov
e43968bc95 add a tag for debug only 2025-04-01 11:41:58 +02:00
Alexey Masterov
5c8b9457c1 add a jq to neon-extensions-test 2025-04-01 11:22:27 +02:00
Alexey Masterov
27d6ebabb3 add checkout 2025-04-01 11:17:28 +02:00
Alexey Masterov
fa6b4ed659 fix pg-version 2025-04-01 11:14:41 +02:00
Alexey Masterov
3c7a526971 change runs-on 2025-04-01 11:13:09 +02:00
Alexey Masterov
da01a57759 fix 2025-04-01 11:02:11 +02:00
Alexey Masterov
9c2e296af7 Debug only 2025-04-01 11:00:12 +02:00
Alexey Masterov
c92dc09104 create a project in the workflow 2025-04-01 10:53:16 +02:00
Alexey Masterov
b41bca9f58 Add a workflow 2025-03-31 17:01:02 +02:00
65 changed files with 1461 additions and 388 deletions

View File

@@ -965,7 +965,7 @@ jobs:
fi
- name: Verify docker-compose example and test extensions
timeout-minutes: 20
timeout-minutes: 40
env:
TAG: >-
${{

View File

@@ -10,10 +10,10 @@ on:
- cron: '45 1 * * *' # run once a day, timezone is utc
workflow_dispatch: # adds ability to run this manually
inputs:
region_id:
project_id:
description: 'Project region id. If not set, the default region will be used'
required: false
default: 'aws-us-east-2'
default: 'raspy-voice-43157743'
defaults:
run:
@@ -33,12 +33,12 @@ jobs:
strategy:
fail-fast: false
matrix:
pg-version: [16, 17]
pg-version: [17]
runs-on: [ self-hosted, small ]
runs-on: us-east-2
container:
# We use the neon-test-extensions image here as it contains the source code for the extensions.
image: ghcr.io/neondatabase/neon-test-extensions-v${{ matrix.pg-version }}:latest
image: ghcr.io/neondatabase/neon-test-extensions-v${{ matrix.pg-version }}:14928114423
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
@@ -61,36 +61,37 @@ jobs:
ULID=pgx_ulid
fi
LIBS=timescaledb:rag_bge_small_en_v15,rag_jina_reranker_v1_tiny_en:$ULID
settings=$(jq -c -n --arg libs $LIBS '{preload_libraries:{use_defaults:false,enabled_libraries:($libs| split(":"))}}')
settings=$(jq -c -n --arg libs $LIBS '{preload_libraries:{use_defaults:false,enabled_libraries:($libs| split(":"))}, compute_flags:{target_architecture:"x64"}}')
echo settings=$settings >> $GITHUB_OUTPUT
- name: Create Neon Project
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ inputs.region_id || 'aws-us-east-2' }}
postgres_version: ${{ matrix.pg-version }}
project_settings: ${{ steps.project-settings.outputs.settings }}
# We need these settings to get the expected output results.
# We cannot use the environment variables e.g. PGTZ due to
# https://github.com/neondatabase/neon/issues/1287
default_endpoint_settings: >
{
"pg_settings": {
"DateStyle": "Postgres,MDY",
"TimeZone": "America/Los_Angeles",
"compute_query_id": "off",
"neon.allow_unstable_extensions": "on"
}
}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
- name: Get the connection URI
id: connect-uri
run: |
res=$(curl -w "%{http_code}" -X GET \
"https://console-stage.neon.build/api/v2/projects/${PROJECT_ID}/connection_uri?database_name=neondb&role_name=neondb_owner&pooled=false" \
-H 'accept: application/json' \
-H "Authorization: Bearer ${API_KEY}"
)
code=${res: -3}
echo ${code}
if [[ ${code} -ge 400 ]]; then
echo ${res::-3}
exit 1
fi
URI=$(echo ${res::-3} | jq -r .uri)
sed +x
password=$(echo "$URI" | sed -n 's|.*://[^:]*:\([^@]*\)@.*|\1|p')
echo ::add-mask::$password
echo uri=${URI} >> ${GITHUB_OUTPUT}
env:
PROJECT_ID: ${{ inputs.project_id }}
API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Run the regression tests
run: /run-tests.sh -r /ext-src
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
SKIP: "pg_hint_plan-src,pg_repack-src,pg_cron-src,plpgsql_check-src"
BENCHMARK_CONNSTR: ${{ steps.connect-uri.outputs.uri }}
SKIP: "hll-src,hypopg-src,ip4r-src,pg_cron-src,pg_graphql-src,pg_hint_plan-src,pg_ivm-src,pg_jsonschema-src,pg_repack-src,pg_roaringbitmap-src,pg_semver-src,pg_session_jwt-src,pg_tiktoken-src,pg_uuidv7-src,pgjwt-src,pgrag-src,pgtap-src,pgvector-src,pgx_ulid-src,plv8-src,postgresql-unit-src,prefix-src,rag_bge_small_en_v15-src,rag_jina_reranker_v1_tiny_en-src,rum-src,plpgsql_check-src"
- name: Delete Neon Project
if: ${{ always() }}

View File

@@ -297,6 +297,7 @@ RUN ./autogen.sh && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
make staged-install && \
cd extensions/postgis && \
make clean && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -1809,10 +1810,25 @@ RUN make PG_VERSION="${PG_VERSION:?}" -C compute
FROM pg-build AS extension-tests
ARG PG_VERSION
# This is required for the PostGIS test
RUN apt-get update && case $DEBIAN_VERSION in \
bullseye) \
apt-get install -y libproj19 libgdal28 time; \
;; \
bookworm) \
apt-get install -y libgdal32 libproj25 time; \
;; \
*) \
echo "Unknown Debian version ${DEBIAN_VERSION}" && exit 1 \
;; \
esac
COPY docker-compose/ext-src/ /ext-src/
COPY --from=pg-build /postgres /postgres
#COPY --from=postgis-src /ext-src/ /ext-src/
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=postgis-build /ext-src/postgis-src /ext-src/postgis-src
COPY --from=postgis-build /sfcgal/* /usr
COPY --from=plv8-src /ext-src/ /ext-src/
#COPY --from=h3-pg-src /ext-src/ /ext-src/
COPY --from=postgresql-unit-src /ext-src/ /ext-src/

View File

@@ -60,12 +60,16 @@ use utils::failpoint_support;
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
fn parse_remote_ext_config(arg: &str) -> Result<String> {
if arg.starts_with("http") {
Ok(arg.trim_end_matches('/').to_string())
fn parse_remote_ext_base_url(arg: &str) -> Result<String> {
const FALLBACK_PG_EXT_GATEWAY_BASE_URL: &str =
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local";
Ok(if arg.starts_with("http") {
arg
} else {
Ok("http://pg-ext-s3-gateway".to_string())
FALLBACK_PG_EXT_GATEWAY_BASE_URL
}
.to_owned())
}
#[derive(Parser)]
@@ -74,8 +78,10 @@ struct Cli {
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
pub pgbin: String,
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
pub remote_ext_config: Option<String>,
/// The base URL for the remote extension storage proxy gateway.
/// Should be in the form of `http(s)://<gateway-hostname>[:<port>]`.
#[arg(short = 'r', long, value_parser = parse_remote_ext_base_url, alias = "remote-ext-config")]
pub remote_ext_base_url: Option<String>,
/// The port to bind the external listening HTTP server to. Clients running
/// outside the compute will talk to the compute through this port. Keep
@@ -164,7 +170,7 @@ fn main() -> Result<()> {
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port,
ext_remote_storage: cli.remote_ext_config.clone(),
remote_ext_base_url: cli.remote_ext_base_url.clone(),
resize_swap_on_bind: cli.resize_swap_on_bind,
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
#[cfg(target_os = "linux")]
@@ -265,4 +271,18 @@ mod test {
fn verify_cli() {
Cli::command().debug_assert()
}
#[test]
fn parse_pg_ext_gateway_base_url() {
let arg = "http://pg-ext-s3-gateway2";
let result = super::parse_remote_ext_base_url(arg).unwrap();
assert_eq!(result, arg);
let arg = "pg-ext-s3-gateway";
let result = super::parse_remote_ext_base_url(arg).unwrap();
assert_eq!(
result,
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local"
);
}
}

View File

@@ -348,6 +348,7 @@ async fn run_dump_restore(
"--no-security-labels".to_string(),
"--no-subscriptions".to_string(),
"--no-tablespaces".to_string(),
"--no-event-triggers".to_string(),
// format
"--format".to_string(),
"directory".to_string(),

View File

@@ -95,7 +95,7 @@ pub struct ComputeNodeParams {
pub internal_http_port: u16,
/// the address of extension storage proxy gateway
pub ext_remote_storage: Option<String>,
pub remote_ext_base_url: Option<String>,
}
/// Compute node info shared across several `compute_ctl` threads.
@@ -329,11 +329,39 @@ struct StartVmMonitorResult {
impl ComputeNode {
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
let connstr = params.connstr.as_str();
let conn_conf = postgres::config::Config::from_str(connstr)
let mut conn_conf = postgres::config::Config::from_str(connstr)
.context("cannot build postgres config from connstr")?;
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
let mut tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
.context("cannot build tokio postgres config from connstr")?;
// Users can set some configuration parameters per database with
// ALTER DATABASE ... SET ...
//
// There are at least these parameters:
//
// - role=some_other_role
// - default_transaction_read_only=on
// - statement_timeout=1, i.e., 1ms, which will cause most of the queries to fail
// - search_path=non_public_schema, this should be actually safe because
// we don't call any functions in user databases, but better to always reset
// it to public.
//
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
//
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
// as well. After rolling out this code, we can remove this parameter from control plane.
// In the meantime, double-passing is fine, the last value is applied.
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
let options = match conn_conf.get_options() {
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
None => EXTRA_OPTIONS.to_string(),
};
conn_conf.options(&options);
tokio_conn_conf.options(&options);
let mut new_state = ComputeState::new();
if let Some(spec) = config.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
@@ -1449,15 +1477,20 @@ impl ComputeNode {
Err(e) => match e.code() {
Some(&SqlState::INVALID_PASSWORD)
| Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => {
// Connect with zenith_admin if cloud_admin could not authenticate
// Connect with `zenith_admin` if `cloud_admin` could not authenticate
info!(
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
"cannot connect to Postgres: {}, retrying with 'zenith_admin' username",
e
);
let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
zenith_admin_conf.application_name("compute_ctl:apply_config");
zenith_admin_conf.user("zenith_admin");
// It doesn't matter what were the options before, here we just want
// to connect and create a new superuser role.
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
zenith_admin_conf.options(ZENITH_OPTIONS);
let mut client =
zenith_admin_conf.connect(NoTls)
.context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?;
@@ -1623,9 +1656,7 @@ impl ComputeNode {
self.pg_reload_conf()?;
if spec.mode == ComputeMode::Primary {
let mut conf =
tokio_postgres::Config::from_str(self.params.connstr.as_str()).unwrap();
conf.application_name("apply_config");
let conf = self.get_tokio_conn_conf(Some("compute_ctl:reconfigure"));
let conf = Arc::new(conf);
let spec = Arc::new(spec.clone());
@@ -1865,9 +1896,9 @@ LIMIT 100",
real_ext_name: String,
ext_path: RemotePath,
) -> Result<u64, DownloadError> {
let ext_remote_storage =
let remote_ext_base_url =
self.params
.ext_remote_storage
.remote_ext_base_url
.as_ref()
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
"Remote extensions storage is not configured",
@@ -1929,7 +1960,7 @@ LIMIT 100",
let download_size = extension_server::download_extension(
&real_ext_name,
&ext_path,
ext_remote_storage,
remote_ext_base_url,
&self.params.pgbin,
)
.await
@@ -2038,7 +2069,7 @@ LIMIT 100",
&self,
spec: &ComputeSpec,
) -> Result<RemoteExtensionMetrics> {
if self.params.ext_remote_storage.is_none() {
if self.params.remote_ext_base_url.is_none() {
return Ok(RemoteExtensionMetrics {
num_ext_downloaded: 0,
largest_ext_size: 0,

View File

@@ -158,14 +158,14 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
ext_remote_storage: &str,
remote_ext_base_url: &str,
pgbin: &str,
) -> Result<u64> {
info!("Download extension {:?} from {:?}", ext_name, ext_path);
// TODO add retry logic
let download_buffer =
match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
Ok(buffer) => buffer,
Err(error_message) => {
return Err(anyhow::anyhow!(
@@ -272,8 +272,8 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
// Do request to extension storage proxy, e.g.,
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
// using HTTP GET and return the response body as bytes.
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
let uri = format!("{}/{}", ext_remote_storage, ext_path);
async fn download_extension_tar(remote_ext_base_url: &str, ext_path: &str) -> Result<Bytes> {
let uri = format!("{}/{}", remote_ext_base_url, ext_path);
let filename = Path::new(ext_path)
.file_name()
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"))

View File

@@ -22,7 +22,7 @@ pub(in crate::http) async fn download_extension(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
// Don't even try to download extensions if no remote storage is configured
if compute.params.ext_remote_storage.is_none() {
if compute.params.remote_ext_base_url.is_none() {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"remote storage is not configured",

View File

@@ -644,9 +644,10 @@ struct EndpointStartCmdArgs {
#[clap(
long,
help = "Configure the remote extensions storage proxy gateway to request for extensions."
help = "Configure the remote extensions storage proxy gateway URL to request for extensions.",
alias = "remote-ext-config"
)]
remote_ext_config: Option<String>,
remote_ext_base_url: Option<String>,
#[clap(
long,
@@ -1414,9 +1415,16 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
EndpointCmd::Start(args) => {
let endpoint_id = &args.endpoint_id;
let pageserver_id = args.endpoint_pageserver_id;
let remote_ext_config = &args.remote_ext_config;
let remote_ext_base_url = &args.remote_ext_base_url;
let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new);
let default_generation = env
.storage_controller
.timelines_onto_safekeepers
.then_some(1);
let safekeepers_generation = args
.safekeepers_generation
.or(default_generation)
.map(SafekeeperGeneration::new);
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
@@ -1510,7 +1518,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
safekeepers_generation,
safekeepers,
pageservers,
remote_ext_config.as_ref(),
remote_ext_base_url.as_ref(),
stripe_size.0 as usize,
args.create_test_user,
args.start_timeout,

View File

@@ -655,7 +655,7 @@ impl Endpoint {
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
remote_ext_config: Option<&String>,
remote_ext_base_url: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
start_timeout: Duration,
@@ -825,8 +825,8 @@ impl Endpoint {
.stderr(logfile.try_clone()?)
.stdout(logfile);
if let Some(remote_ext_config) = remote_ext_config {
cmd.args(["--remote-ext-config", remote_ext_config]);
if let Some(remote_ext_base_url) = remote_ext_base_url {
cmd.args(["--remote-ext-base-url", remote_ext_base_url]);
}
let child = cmd.spawn()?;

View File

@@ -10,7 +10,8 @@ use camino::{Utf8Path, Utf8PathBuf};
use hyper0::Uri;
use nix::unistd::Pid;
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest,
SafekeeperSchedulingPolicyRequest, SkSchedulingPolicy, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse,
};
use pageserver_api::models::{
@@ -20,7 +21,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::Method;
use reqwest::{Method, Response};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -570,6 +571,11 @@ impl StorageController {
let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
.expect("failed to generate jwt token");
args.push(format!("--peer-jwt-token={peer_jwt_token}"));
let claims = Claims::new(None, Scope::SafekeeperData);
let jwt_token =
encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
args.push(format!("--safekeeper-jwt-token={jwt_token}"));
}
if let Some(public_key) = &self.public_key {
@@ -614,6 +620,10 @@ impl StorageController {
self.env.base_data_dir.display()
));
if self.env.safekeepers.iter().any(|sk| sk.auth_enabled) && self.private_key.is_none() {
anyhow::bail!("Safekeeper set up for auth but no private key specified");
}
if self.config.timelines_onto_safekeepers {
args.push("--timelines-onto-safekeepers".to_string());
}
@@ -640,6 +650,10 @@ impl StorageController {
)
.await?;
if self.config.timelines_onto_safekeepers {
self.register_safekeepers().await?;
}
Ok(())
}
@@ -743,6 +757,23 @@ impl StorageController {
where
RQ: Serialize + Sized,
RS: DeserializeOwned + Sized,
{
let response = self.dispatch_inner(method, path, body).await?;
Ok(response
.json()
.await
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
}
/// Simple HTTP request wrapper for calling into storage controller
async fn dispatch_inner<RQ>(
&self,
method: reqwest::Method,
path: String,
body: Option<RQ>,
) -> anyhow::Result<Response>
where
RQ: Serialize + Sized,
{
// In the special case of the `storage_controller start` subcommand, we wish
// to use the API endpoint of the newly started storage controller in order
@@ -785,10 +816,31 @@ impl StorageController {
let response = builder.send().await?;
let response = response.error_from_body().await?;
Ok(response
.json()
.await
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
Ok(response)
}
/// Register the safekeepers in the storage controller
#[instrument(skip(self))]
async fn register_safekeepers(&self) -> anyhow::Result<()> {
for sk in self.env.safekeepers.iter() {
let sk_id = sk.id;
let body = serde_json::json!({
"id": sk_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "127.0.0.1",
"port": sk.pg_port,
"http_port": sk.http_port,
"https_port": sk.https_port,
"version": 5957,
"availability_zone_id": format!("us-east-2b-{sk_id}"),
});
self.upsert_safekeeper(sk_id, body).await?;
self.safekeeper_scheduling_policy(sk_id, SkSchedulingPolicy::Active)
.await?;
}
Ok(())
}
/// Call into the attach_hook API, for use before handing out attachments to pageservers
@@ -816,6 +868,42 @@ impl StorageController {
Ok(response.generation)
}
#[instrument(skip(self))]
pub async fn upsert_safekeeper(
&self,
node_id: NodeId,
request: serde_json::Value,
) -> anyhow::Result<()> {
let resp = self
.dispatch_inner::<serde_json::Value>(
Method::POST,
format!("control/v1/safekeeper/{node_id}"),
Some(request),
)
.await?;
if !resp.status().is_success() {
anyhow::bail!(
"setting scheduling policy unsuccessful for safekeeper {node_id}: {}",
resp.status()
);
}
Ok(())
}
#[instrument(skip(self))]
pub async fn safekeeper_scheduling_policy(
&self,
node_id: NodeId,
scheduling_policy: SkSchedulingPolicy,
) -> anyhow::Result<()> {
self.dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
Method::POST,
format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
)
.await
}
#[instrument(skip(self))]
pub async fn inspect(
&self,

View File

@@ -13,6 +13,6 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
jq \
netcat-openbsd
#This is required for the pg_hintplan test
RUN mkdir -p /ext-src/pg_hint_plan-src /postgres/contrib/file_fdw && chown postgres /ext-src/pg_hint_plan-src /postgres/contrib/file_fdw
RUN mkdir -p /ext-src/pg_hint_plan-src /postgres/contrib/file_fdw /ext-src/postgis-src/ && chown postgres /ext-src/pg_hint_plan-src /postgres/contrib/file_fdw /ext-src/postgis-src
USER postgres

View File

@@ -186,13 +186,14 @@ services:
neon-test-extensions:
profiles: ["test-extensions"]
image: ${REPOSITORY:-ghcr.io/neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-16}:${TEST_EXTENSIONS_TAG:-${TAG:-latest}}
image: ${REPOSITORY:-ghcr.io/neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-${PG_VERSION:-16}}:${TEST_EXTENSIONS_TAG:-${TAG:-latest}}
environment:
- PGPASSWORD=cloud_admin
- PGUSER=${PGUSER:-cloud_admin}
- PGPASSWORD=${PGPASSWORD:-cloud_admin}
entrypoint:
- "/bin/bash"
- "-c"
command:
- sleep 1800
- sleep 360000
depends_on:
- compute

View File

@@ -54,6 +54,15 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config
docker compose exec compute touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# Prepare for the PostGIS test
docker compose exec compute mkdir -p /tmp/pgis_reg/pgis_reg_tmp
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/postgis-src/raster/test "${TMPDIR}"
docker compose cp neon-test-extensions:/ext-src/postgis-src/regress/00-regress-install "${TMPDIR}"
docker compose exec compute mkdir -p /ext-src/postgis-src/raster /ext-src/postgis-src/regress /ext-src/postgis-src/regress/00-regress-install
docker compose cp "${TMPDIR}/test" compute:/ext-src/postgis-src/raster/test
docker compose cp "${TMPDIR}/00-regress-install" compute:/ext-src/postgis-src/regress
rm -rf "${TMPDIR}"
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${TMPDIR}/data"
@@ -68,7 +77,7 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
docker compose exec -T neon-test-extensions bash -c "(cd /postgres && patch -p1)" <"../compute/patches/contrib_pg${pg_version}.patch"
# We are running tests now
rm -f testout.txt testout_contrib.txt
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
neon-test-extensions /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker compose exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
neon-test-extensions /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0

View File

@@ -0,0 +1,9 @@
#!/bin/bash
set -ex
cd "$(dirname "$0")"
if [[ ${PG_VERSION} = v17 ]]; then
sed -i '/computed_columns/d' regress/core/tests.mk
fi
patch -p1 <postgis-no-upgrade-test.patch
trap 'echo Cleaning up; patch -R -p1 <postgis-no-upgrade-test.patch' EXIT
make installcheck-base

View File

@@ -0,0 +1,21 @@
diff --git a/regress/runtest.mk b/regress/runtest.mk
index c051f03..010e493 100644
--- a/regress/runtest.mk
+++ b/regress/runtest.mk
@@ -24,16 +24,6 @@ check-regress:
POSTGIS_TOP_BUILD_DIR=$(abs_top_builddir) $(PERL) $(top_srcdir)/regress/run_test.pl $(RUNTESTFLAGS) $(RUNTESTFLAGS_INTERNAL) $(TESTS)
- @if echo "$(RUNTESTFLAGS)" | grep -vq -- --upgrade; then \
- echo "Running upgrade test as RUNTESTFLAGS did not contain that"; \
- POSTGIS_TOP_BUILD_DIR=$(abs_top_builddir) $(PERL) $(top_srcdir)/regress/run_test.pl \
- --upgrade \
- $(RUNTESTFLAGS) \
- $(RUNTESTFLAGS_INTERNAL) \
- $(TESTS); \
- else \
- echo "Skipping upgrade test as RUNTESTFLAGS already requested upgrades"; \
- fi
check-long:
$(PERL) $(top_srcdir)/regress/run_test.pl $(RUNTESTFLAGS) $(TESTS) $(TESTS_SLOW)

View File

@@ -0,0 +1,198 @@
diff --git a/raster/test/regress/tests.mk b/raster/test/regress/tests.mk
index 00918e1..7e2b6cd 100644
--- a/raster/test/regress/tests.mk
+++ b/raster/test/regress/tests.mk
@@ -17,9 +17,7 @@ override RUNTESTFLAGS_INTERNAL := \
$(RUNTESTFLAGS_INTERNAL) \
--after-upgrade-script $(top_srcdir)/raster/test/regress/hooks/hook-after-upgrade-raster.sql
-RASTER_TEST_FIRST = \
- $(top_srcdir)/raster/test/regress/check_gdal \
- $(top_srcdir)/raster/test/regress/loader/load_outdb
+RASTER_TEST_FIRST =
RASTER_TEST_LAST = \
$(top_srcdir)/raster/test/regress/clean
@@ -33,9 +31,7 @@ RASTER_TEST_IO = \
RASTER_TEST_BASIC_FUNC = \
$(top_srcdir)/raster/test/regress/rt_bytea \
- $(top_srcdir)/raster/test/regress/rt_wkb \
$(top_srcdir)/raster/test/regress/box3d \
- $(top_srcdir)/raster/test/regress/rt_addband \
$(top_srcdir)/raster/test/regress/rt_band \
$(top_srcdir)/raster/test/regress/rt_tile
@@ -73,16 +69,10 @@ RASTER_TEST_BANDPROPS = \
$(top_srcdir)/raster/test/regress/rt_neighborhood \
$(top_srcdir)/raster/test/regress/rt_nearestvalue \
$(top_srcdir)/raster/test/regress/rt_pixelofvalue \
- $(top_srcdir)/raster/test/regress/rt_polygon \
- $(top_srcdir)/raster/test/regress/rt_setbandpath
+ $(top_srcdir)/raster/test/regress/rt_polygon
RASTER_TEST_UTILITY = \
$(top_srcdir)/raster/test/regress/rt_utility \
- $(top_srcdir)/raster/test/regress/rt_fromgdalraster \
- $(top_srcdir)/raster/test/regress/rt_asgdalraster \
- $(top_srcdir)/raster/test/regress/rt_astiff \
- $(top_srcdir)/raster/test/regress/rt_asjpeg \
- $(top_srcdir)/raster/test/regress/rt_aspng \
$(top_srcdir)/raster/test/regress/rt_reclass \
$(top_srcdir)/raster/test/regress/rt_gdalwarp \
$(top_srcdir)/raster/test/regress/rt_gdalcontour \
@@ -120,21 +110,13 @@ RASTER_TEST_SREL = \
RASTER_TEST_BUGS = \
$(top_srcdir)/raster/test/regress/bug_test_car5 \
- $(top_srcdir)/raster/test/regress/permitted_gdal_drivers \
$(top_srcdir)/raster/test/regress/tickets
RASTER_TEST_LOADER = \
$(top_srcdir)/raster/test/regress/loader/Basic \
$(top_srcdir)/raster/test/regress/loader/Projected \
$(top_srcdir)/raster/test/regress/loader/BasicCopy \
- $(top_srcdir)/raster/test/regress/loader/BasicFilename \
- $(top_srcdir)/raster/test/regress/loader/BasicOutDB \
- $(top_srcdir)/raster/test/regress/loader/Tiled10x10 \
- $(top_srcdir)/raster/test/regress/loader/Tiled10x10Copy \
- $(top_srcdir)/raster/test/regress/loader/Tiled8x8 \
- $(top_srcdir)/raster/test/regress/loader/TiledAuto \
- $(top_srcdir)/raster/test/regress/loader/TiledAutoSkipNoData \
- $(top_srcdir)/raster/test/regress/loader/TiledAutoCopyn
+ $(top_srcdir)/raster/test/regress/loader/BasicFilename
RASTER_TESTS := $(RASTER_TEST_FIRST) \
$(RASTER_TEST_METADATA) $(RASTER_TEST_IO) $(RASTER_TEST_BASIC_FUNC) \
diff --git a/regress/core/binary.sql b/regress/core/binary.sql
index 7a36b65..ad78fc7 100644
--- a/regress/core/binary.sql
+++ b/regress/core/binary.sql
@@ -1,4 +1,5 @@
SET client_min_messages TO warning;
+
CREATE SCHEMA tm;
CREATE TABLE tm.geoms (id serial, g geometry);
@@ -31,24 +32,39 @@ SELECT st_force4d(g) FROM tm.geoms WHERE id < 15 ORDER BY id;
INSERT INTO tm.geoms(g)
SELECT st_setsrid(g,4326) FROM tm.geoms ORDER BY id;
-COPY tm.geoms TO :tmpfile WITH BINARY;
+-- define temp file path
+\set tmpfile '/tmp/postgis_binary_test.dat'
+
+-- export
+\set command '\\copy tm.geoms TO ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+-- import
CREATE TABLE tm.geoms_in AS SELECT * FROM tm.geoms LIMIT 0;
-COPY tm.geoms_in FROM :tmpfile WITH BINARY;
-SELECT 'geometry', count(*) FROM tm.geoms_in i, tm.geoms o WHERE i.id = o.id
- AND ST_OrderingEquals(i.g, o.g);
+\set command '\\copy tm.geoms_in FROM ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+SELECT 'geometry', count(*) FROM tm.geoms_in i, tm.geoms o
+WHERE i.id = o.id AND ST_OrderingEquals(i.g, o.g);
CREATE TABLE tm.geogs AS SELECT id,g::geography FROM tm.geoms
WHERE geometrytype(g) NOT LIKE '%CURVE%'
AND geometrytype(g) NOT LIKE '%CIRCULAR%'
AND geometrytype(g) NOT LIKE '%SURFACE%'
AND geometrytype(g) NOT LIKE 'TRIANGLE%'
- AND geometrytype(g) NOT LIKE 'TIN%'
-;
+ AND geometrytype(g) NOT LIKE 'TIN%';
-COPY tm.geogs TO :tmpfile WITH BINARY;
+-- export
+\set command '\\copy tm.geogs TO ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+-- import
CREATE TABLE tm.geogs_in AS SELECT * FROM tm.geogs LIMIT 0;
-COPY tm.geogs_in FROM :tmpfile WITH BINARY;
-SELECT 'geometry', count(*) FROM tm.geogs_in i, tm.geogs o WHERE i.id = o.id
- AND ST_OrderingEquals(i.g::geometry, o.g::geometry);
+\set command '\\copy tm.geogs_in FROM ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+SELECT 'geometry', count(*) FROM tm.geogs_in i, tm.geogs o
+WHERE i.id = o.id AND ST_OrderingEquals(i.g::geometry, o.g::geometry);
DROP SCHEMA tm CASCADE;
+
diff --git a/regress/core/tests.mk b/regress/core/tests.mk
index 3abd7bc..94903c3 100644
--- a/regress/core/tests.mk
+++ b/regress/core/tests.mk
@@ -23,7 +23,6 @@ current_dir := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
RUNTESTFLAGS_INTERNAL += \
--before-upgrade-script $(top_srcdir)/regress/hooks/hook-before-upgrade.sql \
--after-upgrade-script $(top_srcdir)/regress/hooks/hook-after-upgrade.sql \
- --after-create-script $(top_srcdir)/regress/hooks/hook-after-create.sql \
--before-uninstall-script $(top_srcdir)/regress/hooks/hook-before-uninstall.sql
TESTS += \
@@ -40,7 +39,6 @@ TESTS += \
$(top_srcdir)/regress/core/dumppoints \
$(top_srcdir)/regress/core/dumpsegments \
$(top_srcdir)/regress/core/empty \
- $(top_srcdir)/regress/core/estimatedextent \
$(top_srcdir)/regress/core/forcecurve \
$(top_srcdir)/regress/core/flatgeobuf \
$(top_srcdir)/regress/core/geography \
@@ -55,7 +53,6 @@ TESTS += \
$(top_srcdir)/regress/core/out_marc21 \
$(top_srcdir)/regress/core/in_encodedpolyline \
$(top_srcdir)/regress/core/iscollection \
- $(top_srcdir)/regress/core/legacy \
$(top_srcdir)/regress/core/letters \
$(top_srcdir)/regress/core/long_xact \
$(top_srcdir)/regress/core/lwgeom_regress \
@@ -112,7 +109,6 @@ TESTS += \
$(top_srcdir)/regress/core/temporal_knn \
$(top_srcdir)/regress/core/tickets \
$(top_srcdir)/regress/core/twkb \
- $(top_srcdir)/regress/core/typmod \
$(top_srcdir)/regress/core/wkb \
$(top_srcdir)/regress/core/wkt \
$(top_srcdir)/regress/core/wmsservers \
@@ -144,11 +140,6 @@ TESTS_SLOW = \
$(top_srcdir)/regress/core/concave_hull_hard \
$(top_srcdir)/regress/core/knn_recheck
-ifeq ($(shell expr "$(POSTGIS_PGSQL_VERSION)" ">=" 120),1)
- TESTS += \
- $(top_srcdir)/regress/core/computed_columns
-endif
-
ifeq ($(shell expr "$(POSTGIS_GEOS_VERSION)" ">=" 30700),1)
# GEOS-3.7 adds:
# ST_FrechetDistance
diff --git a/regress/loader/tests.mk b/regress/loader/tests.mk
index 1fc77ac..c3cb9de 100644
--- a/regress/loader/tests.mk
+++ b/regress/loader/tests.mk
@@ -38,7 +38,5 @@ TESTS += \
$(top_srcdir)/regress/loader/Latin1 \
$(top_srcdir)/regress/loader/Latin1-implicit \
$(top_srcdir)/regress/loader/mfile \
- $(top_srcdir)/regress/loader/TestSkipANALYZE \
- $(top_srcdir)/regress/loader/TestANALYZE \
$(top_srcdir)/regress/loader/CharNoWidth
diff --git a/regress/run_test.pl b/regress/run_test.pl
index 0ec5b2d..1c331f4 100755
--- a/regress/run_test.pl
+++ b/regress/run_test.pl
@@ -147,7 +147,6 @@ $ENV{"LANG"} = "C";
# Add locale info to the psql options
# Add pg12 precision suppression
my $PGOPTIONS = $ENV{"PGOPTIONS"};
-$PGOPTIONS .= " -c lc_messages=C";
$PGOPTIONS .= " -c client_min_messages=NOTICE";
$PGOPTIONS .= " -c extra_float_digits=0";
$ENV{"PGOPTIONS"} = $PGOPTIONS;

View File

@@ -0,0 +1,218 @@
diff --git a/raster/test/regress/tests.mk b/raster/test/regress/tests.mk
index 00918e1..7e2b6cd 100644
--- a/raster/test/regress/tests.mk
+++ b/raster/test/regress/tests.mk
@@ -17,9 +17,7 @@ override RUNTESTFLAGS_INTERNAL := \
$(RUNTESTFLAGS_INTERNAL) \
--after-upgrade-script $(top_srcdir)/raster/test/regress/hooks/hook-after-upgrade-raster.sql
-RASTER_TEST_FIRST = \
- $(top_srcdir)/raster/test/regress/check_gdal \
- $(top_srcdir)/raster/test/regress/loader/load_outdb
+RASTER_TEST_FIRST =
RASTER_TEST_LAST = \
$(top_srcdir)/raster/test/regress/clean
@@ -33,9 +31,7 @@ RASTER_TEST_IO = \
RASTER_TEST_BASIC_FUNC = \
$(top_srcdir)/raster/test/regress/rt_bytea \
- $(top_srcdir)/raster/test/regress/rt_wkb \
$(top_srcdir)/raster/test/regress/box3d \
- $(top_srcdir)/raster/test/regress/rt_addband \
$(top_srcdir)/raster/test/regress/rt_band \
$(top_srcdir)/raster/test/regress/rt_tile
@@ -73,16 +69,10 @@ RASTER_TEST_BANDPROPS = \
$(top_srcdir)/raster/test/regress/rt_neighborhood \
$(top_srcdir)/raster/test/regress/rt_nearestvalue \
$(top_srcdir)/raster/test/regress/rt_pixelofvalue \
- $(top_srcdir)/raster/test/regress/rt_polygon \
- $(top_srcdir)/raster/test/regress/rt_setbandpath
+ $(top_srcdir)/raster/test/regress/rt_polygon
RASTER_TEST_UTILITY = \
$(top_srcdir)/raster/test/regress/rt_utility \
- $(top_srcdir)/raster/test/regress/rt_fromgdalraster \
- $(top_srcdir)/raster/test/regress/rt_asgdalraster \
- $(top_srcdir)/raster/test/regress/rt_astiff \
- $(top_srcdir)/raster/test/regress/rt_asjpeg \
- $(top_srcdir)/raster/test/regress/rt_aspng \
$(top_srcdir)/raster/test/regress/rt_reclass \
$(top_srcdir)/raster/test/regress/rt_gdalwarp \
$(top_srcdir)/raster/test/regress/rt_gdalcontour \
@@ -120,21 +110,13 @@ RASTER_TEST_SREL = \
RASTER_TEST_BUGS = \
$(top_srcdir)/raster/test/regress/bug_test_car5 \
- $(top_srcdir)/raster/test/regress/permitted_gdal_drivers \
$(top_srcdir)/raster/test/regress/tickets
RASTER_TEST_LOADER = \
$(top_srcdir)/raster/test/regress/loader/Basic \
$(top_srcdir)/raster/test/regress/loader/Projected \
$(top_srcdir)/raster/test/regress/loader/BasicCopy \
- $(top_srcdir)/raster/test/regress/loader/BasicFilename \
- $(top_srcdir)/raster/test/regress/loader/BasicOutDB \
- $(top_srcdir)/raster/test/regress/loader/Tiled10x10 \
- $(top_srcdir)/raster/test/regress/loader/Tiled10x10Copy \
- $(top_srcdir)/raster/test/regress/loader/Tiled8x8 \
- $(top_srcdir)/raster/test/regress/loader/TiledAuto \
- $(top_srcdir)/raster/test/regress/loader/TiledAutoSkipNoData \
- $(top_srcdir)/raster/test/regress/loader/TiledAutoCopyn
+ $(top_srcdir)/raster/test/regress/loader/BasicFilename
RASTER_TESTS := $(RASTER_TEST_FIRST) \
$(RASTER_TEST_METADATA) $(RASTER_TEST_IO) $(RASTER_TEST_BASIC_FUNC) \
diff --git a/regress/core/binary.sql b/regress/core/binary.sql
index 7a36b65..ad78fc7 100644
--- a/regress/core/binary.sql
+++ b/regress/core/binary.sql
@@ -1,4 +1,5 @@
SET client_min_messages TO warning;
+
CREATE SCHEMA tm;
CREATE TABLE tm.geoms (id serial, g geometry);
@@ -31,24 +32,39 @@ SELECT st_force4d(g) FROM tm.geoms WHERE id < 15 ORDER BY id;
INSERT INTO tm.geoms(g)
SELECT st_setsrid(g,4326) FROM tm.geoms ORDER BY id;
-COPY tm.geoms TO :tmpfile WITH BINARY;
+-- define temp file path
+\set tmpfile '/tmp/postgis_binary_test.dat'
+
+-- export
+\set command '\\copy tm.geoms TO ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+-- import
CREATE TABLE tm.geoms_in AS SELECT * FROM tm.geoms LIMIT 0;
-COPY tm.geoms_in FROM :tmpfile WITH BINARY;
-SELECT 'geometry', count(*) FROM tm.geoms_in i, tm.geoms o WHERE i.id = o.id
- AND ST_OrderingEquals(i.g, o.g);
+\set command '\\copy tm.geoms_in FROM ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+SELECT 'geometry', count(*) FROM tm.geoms_in i, tm.geoms o
+WHERE i.id = o.id AND ST_OrderingEquals(i.g, o.g);
CREATE TABLE tm.geogs AS SELECT id,g::geography FROM tm.geoms
WHERE geometrytype(g) NOT LIKE '%CURVE%'
AND geometrytype(g) NOT LIKE '%CIRCULAR%'
AND geometrytype(g) NOT LIKE '%SURFACE%'
AND geometrytype(g) NOT LIKE 'TRIANGLE%'
- AND geometrytype(g) NOT LIKE 'TIN%'
-;
+ AND geometrytype(g) NOT LIKE 'TIN%';
-COPY tm.geogs TO :tmpfile WITH BINARY;
+-- export
+\set command '\\copy tm.geogs TO ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+-- import
CREATE TABLE tm.geogs_in AS SELECT * FROM tm.geogs LIMIT 0;
-COPY tm.geogs_in FROM :tmpfile WITH BINARY;
-SELECT 'geometry', count(*) FROM tm.geogs_in i, tm.geogs o WHERE i.id = o.id
- AND ST_OrderingEquals(i.g::geometry, o.g::geometry);
+\set command '\\copy tm.geogs_in FROM ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+SELECT 'geometry', count(*) FROM tm.geogs_in i, tm.geogs o
+WHERE i.id = o.id AND ST_OrderingEquals(i.g::geometry, o.g::geometry);
DROP SCHEMA tm CASCADE;
+
diff --git a/regress/core/tests.mk b/regress/core/tests.mk
index 9e05244..a63a3e1 100644
--- a/regress/core/tests.mk
+++ b/regress/core/tests.mk
@@ -16,14 +16,13 @@ POSTGIS_PGSQL_VERSION=170
POSTGIS_GEOS_VERSION=31101
HAVE_JSON=yes
HAVE_SPGIST=yes
-INTERRUPTTESTS=yes
+INTERRUPTTESTS=no
current_dir := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
RUNTESTFLAGS_INTERNAL += \
--before-upgrade-script $(top_srcdir)/regress/hooks/hook-before-upgrade.sql \
--after-upgrade-script $(top_srcdir)/regress/hooks/hook-after-upgrade.sql \
- --after-create-script $(top_srcdir)/regress/hooks/hook-after-create.sql \
--before-uninstall-script $(top_srcdir)/regress/hooks/hook-before-uninstall.sql
TESTS += \
@@ -40,7 +39,6 @@ TESTS += \
$(top_srcdir)/regress/core/dumppoints \
$(top_srcdir)/regress/core/dumpsegments \
$(top_srcdir)/regress/core/empty \
- $(top_srcdir)/regress/core/estimatedextent \
$(top_srcdir)/regress/core/forcecurve \
$(top_srcdir)/regress/core/flatgeobuf \
$(top_srcdir)/regress/core/frechet \
@@ -60,7 +58,6 @@ TESTS += \
$(top_srcdir)/regress/core/out_marc21 \
$(top_srcdir)/regress/core/in_encodedpolyline \
$(top_srcdir)/regress/core/iscollection \
- $(top_srcdir)/regress/core/legacy \
$(top_srcdir)/regress/core/letters \
$(top_srcdir)/regress/core/lwgeom_regress \
$(top_srcdir)/regress/core/measures \
@@ -119,7 +116,6 @@ TESTS += \
$(top_srcdir)/regress/core/temporal_knn \
$(top_srcdir)/regress/core/tickets \
$(top_srcdir)/regress/core/twkb \
- $(top_srcdir)/regress/core/typmod \
$(top_srcdir)/regress/core/wkb \
$(top_srcdir)/regress/core/wkt \
$(top_srcdir)/regress/core/wmsservers \
@@ -143,8 +139,7 @@ TESTS += \
$(top_srcdir)/regress/core/oriented_envelope \
$(top_srcdir)/regress/core/point_coordinates \
$(top_srcdir)/regress/core/out_geojson \
- $(top_srcdir)/regress/core/wrapx \
- $(top_srcdir)/regress/core/computed_columns
+ $(top_srcdir)/regress/core/wrapx
# Slow slow tests
TESTS_SLOW = \
diff --git a/regress/loader/tests.mk b/regress/loader/tests.mk
index ac4f8ad..4bad4fc 100644
--- a/regress/loader/tests.mk
+++ b/regress/loader/tests.mk
@@ -38,7 +38,5 @@ TESTS += \
$(top_srcdir)/regress/loader/Latin1 \
$(top_srcdir)/regress/loader/Latin1-implicit \
$(top_srcdir)/regress/loader/mfile \
- $(top_srcdir)/regress/loader/TestSkipANALYZE \
- $(top_srcdir)/regress/loader/TestANALYZE \
$(top_srcdir)/regress/loader/CharNoWidth \
diff --git a/regress/run_test.pl b/regress/run_test.pl
index cac4b2e..4c7c82b 100755
--- a/regress/run_test.pl
+++ b/regress/run_test.pl
@@ -238,7 +238,6 @@ $ENV{"LANG"} = "C";
# Add locale info to the psql options
# Add pg12 precision suppression
my $PGOPTIONS = $ENV{"PGOPTIONS"};
-$PGOPTIONS .= " -c lc_messages=C";
$PGOPTIONS .= " -c client_min_messages=NOTICE";
$PGOPTIONS .= " -c extra_float_digits=0";
$ENV{"PGOPTIONS"} = $PGOPTIONS;
diff --git a/topology/test/tests.mk b/topology/test/tests.mk
index cbe2633..2c7c18f 100644
--- a/topology/test/tests.mk
+++ b/topology/test/tests.mk
@@ -46,9 +46,7 @@ TESTS += \
$(top_srcdir)/topology/test/regress/legacy_query.sql \
$(top_srcdir)/topology/test/regress/legacy_validate.sql \
$(top_srcdir)/topology/test/regress/polygonize.sql \
- $(top_srcdir)/topology/test/regress/populate_topology_layer.sql \
$(top_srcdir)/topology/test/regress/removeunusedprimitives.sql \
- $(top_srcdir)/topology/test/regress/renametopogeometrycolumn.sql \
$(top_srcdir)/topology/test/regress/renametopology.sql \
$(top_srcdir)/topology/test/regress/share_sequences.sql \
$(top_srcdir)/topology/test/regress/sqlmm.sql \

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,19 @@
#!/bin/bash
set -ex
cd "$(dirname "${0}")"
dropdb --if-exist contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "ALTER DATABASE contrib_regression SET TimeZone='UTC'" \
-c "ALTER DATABASE contrib_regression SET DateStyle='ISO, MDY'" \
-c "CREATE EXTENSION postgis SCHEMA public" \
-c "CREATE EXTENSION postgis_topology" \
-c "CREATE EXTENSION postgis_tiger_geocoder CASCADE" \
-c "CREATE EXTENSION postgis_raster SCHEMA public" \
-c "CREATE EXTENSION postgis_sfcgal SCHEMA public"
patch -p1 <postgis-no-upgrade-test.patch
patch -p1 <"postgis-regular-${PG_VERSION}.patch"
psql -d contrib_regression -f raster_outdb_template.sql
trap 'patch -R -p1 <postgis-no-upgrade-test.patch && patch -R -p1 <"postgis-regular-${PG_VERSION}.patch"' EXIT
#XXX remove after fix of using system libpq problem
export LD_LIBRARY_PATH=/usr/local/pgsql/lib
POSTGIS_REGRESS_DB=contrib_regression RUNTESTFLAGS=--nocreate make installcheck-base

View File

@@ -63,5 +63,9 @@ done
for d in ${FAILED}; do
cat "$(find $d -name regression.diffs)"
done
for postgis_diff in /tmp/pgis_reg/*_diff; do
echo "${postgis_diff}:"
cat "${postgis_diff}"
done
echo "${FAILED}"
exit 1

View File

@@ -43,21 +43,6 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
/// PostHog integration config
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
project_id: String,
/// Server-side (private) API key
server_api_key: String,
/// Client-side (public) API key
client_api_key: String,
/// Private API URL
private_api_url: String,
/// Public API URL
public_api_url: String,
}
/// `pageserver.toml`
///
/// We use serde derive with `#[serde(default)]` to generate a deserializer
@@ -197,8 +182,7 @@ pub struct ConfigToml {
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub posthog_config: Option<PostHogConfig>,
pub timeline_import_config: TimelineImportConfig,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -317,6 +301,12 @@ impl From<OtelExporterProtocol> for tracing_utils::Protocol {
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct TimelineImportConfig {
pub import_job_concurrency: NonZeroUsize,
pub import_job_soft_size_limit: NonZeroUsize,
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -676,7 +666,10 @@ impl Default for ConfigToml {
tracing: None,
enable_tls_page_service_api: false,
dev_mode: false,
posthog_config: None,
timeline_import_config: TimelineImportConfig {
import_job_concurrency: NonZeroUsize::new(128).unwrap(),
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
},
}
}
}

View File

@@ -17,7 +17,7 @@ impl std::fmt::Display for RateLimitStats {
}
impl RateLimit {
pub fn new(interval: Duration) -> Self {
pub const fn new(interval: Duration) -> Self {
Self {
last: None,
interval,

View File

@@ -14,7 +14,7 @@ use std::time::Duration;
use anyhow::{Context, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig};
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pem::Pem;
@@ -231,8 +231,7 @@ pub struct PageServerConf {
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
/// PostHog integration config
pub posthog_config: Option<PostHogConfig>,
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
}
/// Token for authentication to safekeepers
@@ -407,7 +406,7 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
posthog_config,
timeline_import_config,
} = config_toml;
let mut conf = PageServerConf {
@@ -461,6 +460,7 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -517,7 +517,6 @@ impl PageServerConf {
}
None => Vec::new(),
},
posthog_config,
};
// ------------------------------------------------------------

View File

@@ -1038,21 +1038,23 @@ impl PageServerHandler {
tracing::info_span!(
parent: &parent_span,
"handle_get_page_request",
request_id = %req.hdr.reqid,
rel = %req.rel,
blkno = %req.blkno,
req_lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since
not_modified_since_lsn = %req.hdr.not_modified_since,
)
}};
($shard_id:expr) => {{
tracing::info_span!(
parent: &parent_span,
"handle_get_page_request",
request_id = %req.hdr.reqid,
rel = %req.rel,
blkno = %req.blkno,
req_lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since,
shard_id = %$shard_id
shard_id = %$shard_id,
)
}};
}

View File

@@ -40,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
use crate::aux_file;
use crate::context::{PerfInstrumentFutureExt, RequestContext};
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
@@ -275,24 +275,30 @@ impl Timeline {
continue;
}
let nblocks = match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
let nblocks = {
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.attached_child();
match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
}
}
};
@@ -308,6 +314,17 @@ impl Timeline {
let key = rel_block_to_key(*tag, *blknum);
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_BATCH",
batch_size = %page_count,
)
})
.attached_child();
let key_slots = keys_slots.entry(key).or_default();
key_slots.push((response_slot_idx, ctx));
@@ -323,14 +340,7 @@ impl Timeline {
let query = VersionedKeySpaceQuery::scattered(query);
let res = self
.get_vectored(query, io_concurrency, ctx)
.maybe_perf_instrument(ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"GET_BATCH",
batch_size = %page_count,
)
})
.maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
.await;
match res {

View File

@@ -94,10 +94,23 @@ impl Header {
pub enum WriteBlobError {
#[error(transparent)]
Flush(FlushTaskError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
WriteBlobRaw(anyhow::Error),
Other(anyhow::Error),
}
impl WriteBlobError {
pub fn is_cancel(&self) -> bool {
match self {
WriteBlobError::Flush(e) => e.is_cancel(),
WriteBlobError::Other(_) => false,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
WriteBlobError::Flush(e) => e.into_anyhow(),
WriteBlobError::Other(e) => e,
}
}
}
impl BlockCursor<'_> {
@@ -327,7 +340,9 @@ where
return (
(
io_buf.slice_len(),
Err(WriteBlobError::BlobTooLarge { len }),
Err(WriteBlobError::Other(anyhow::anyhow!(
"blob too large ({len} bytes)"
))),
),
srcbuf,
);
@@ -391,7 +406,7 @@ where
// Verify the header, to ensure we don't write invalid/corrupt data.
let header = match Header::decode(&raw_with_header)
.context("decoding blob header")
.map_err(WriteBlobError::WriteBlobRaw)
.map_err(WriteBlobError::Other)
{
Ok(header) => header,
Err(err) => return (raw_with_header, Err(err)),
@@ -401,7 +416,7 @@ where
let raw_len = raw_with_header.len();
return (
raw_with_header,
Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
Err(WriteBlobError::Other(anyhow::anyhow!(
"header length mismatch: {header_total_len} != {raw_len}"
))),
);

View File

@@ -2,6 +2,7 @@
pub mod batch_split_writer;
pub mod delta_layer;
pub mod errors;
pub mod filter_iterator;
pub mod image_layer;
pub mod inmemory_layer;

View File

@@ -10,6 +10,7 @@ use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use super::errors::PutError;
use super::layer::S3_UPLOAD_LIMIT;
use super::{
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
@@ -235,7 +236,7 @@ impl<'a> SplitImageLayerWriter<'a> {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), PutError> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -253,7 +254,8 @@ impl<'a> SplitImageLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?;
.await
.map_err(PutError::Other)?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
@@ -346,7 +348,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), PutError> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
//
@@ -366,7 +368,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?,
.await
.map_err(PutError::Other)?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
@@ -386,7 +389,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?;
.await
.map_err(PutError::Other)?;
let (start_key, prev_delta_writer) =
self.inner.replace((key, next_delta_writer)).unwrap();
self.batches.add_unfinished_delta_writer(
@@ -396,11 +400,11 @@ impl<'a> SplitDeltaLayerWriter<'a> {
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
return Err(PutError::Other(anyhow::anyhow!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
);
)));
}
}
self.last_key_written = key;

View File

@@ -55,6 +55,7 @@ use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use super::errors::PutError;
use super::{
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
@@ -477,12 +478,15 @@ impl DeltaLayerWriterInner {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), PutError> {
let (_, res) = self
.put_value_bytes(
key,
lsn,
Value::ser(&val)?.slice_len(),
Value::ser(&val)
.map_err(anyhow::Error::new)
.map_err(PutError::Other)?
.slice_len(),
val.will_init(),
ctx,
)
@@ -497,7 +501,7 @@ impl DeltaLayerWriterInner {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
) -> (FullSlice<Buf>, Result<(), PutError>)
where
Buf: IoBuf + Send,
{
@@ -513,19 +517,24 @@ impl DeltaLayerWriterInner {
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let res = res.map_err(PutError::WriteBlob);
let off = match res {
Ok((off, _)) => off,
Err(e) => return (val, Err(anyhow::anyhow!(e))),
Err(e) => return (val, Err(e)),
};
let blob_ref = BlobRef::new(off, will_init);
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
let res = self.tree.append(&delta_key.0, blob_ref.0);
let res = self
.tree
.append(&delta_key.0, blob_ref.0)
.map_err(anyhow::Error::new)
.map_err(PutError::Other);
self.num_keys += 1;
(val, res.map_err(|e| anyhow::anyhow!(e)))
(val, res)
}
fn size(&self) -> u64 {
@@ -694,7 +703,7 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), PutError> {
self.inner
.as_mut()
.unwrap()
@@ -709,7 +718,7 @@ impl DeltaLayerWriter {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
) -> (FullSlice<Buf>, Result<(), PutError>)
where
Buf: IoBuf + Send,
{

View File

@@ -0,0 +1,24 @@
use crate::tenant::blob_io::WriteBlobError;
#[derive(Debug, thiserror::Error)]
pub enum PutError {
#[error(transparent)]
WriteBlob(WriteBlobError),
#[error(transparent)]
Other(anyhow::Error),
}
impl PutError {
pub fn is_cancel(&self) -> bool {
match self {
PutError::WriteBlob(e) => e.is_cancel(),
PutError::Other(_) => false,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
PutError::WriteBlob(e) => e.into_anyhow(),
PutError::Other(e) => e,
}
}
}

View File

@@ -53,6 +53,7 @@ use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use super::errors::PutError;
use super::layer_name::ImageLayerName;
use super::{
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
@@ -842,8 +843,14 @@ impl ImageLayerWriterInner {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
) -> Result<(), PutError> {
if !self.key_range.contains(&key) {
return Err(PutError::Other(anyhow::anyhow!(
"key {:?} not in range {:?}",
key,
self.key_range
)));
}
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
@@ -853,7 +860,7 @@ impl ImageLayerWriterInner {
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res?;
let (off, compression_info) = res.map_err(PutError::WriteBlob)?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
@@ -865,7 +872,10 @@ impl ImageLayerWriterInner {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;
self.tree
.append(&keybuf, off)
.map_err(anyhow::Error::new)
.map_err(PutError::Other)?;
#[cfg(feature = "testing")]
{
@@ -1085,7 +1095,7 @@ impl ImageLayerWriter {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), PutError> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}

View File

@@ -23,7 +23,7 @@ use super::{
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::context::{RequestContext, RequestContextBuilder};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
@@ -1076,24 +1076,17 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
let ctx = if ctx.has_perf_span() {
let dl_ctx = RequestContextBuilder::from(ctx)
.task_kind(TaskKind::LayerDownload)
.download_behavior(DownloadBehavior::Download)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason
)
})
.detached_child();
ctx.perf_follows_from(&dl_ctx);
dl_ctx
} else {
ctx.attached_child()
};
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason,
)
})
.attached_child();
async move {
tracing::info!(%reason, "downloading on-demand");
@@ -1101,7 +1094,7 @@ impl LayerInner {
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let res = self
.download_init_and_wait(timeline, permit, ctx.attached_child())
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
.await?;
scopeguard::ScopeGuard::into_inner(init_cancelled);
@@ -1709,7 +1702,7 @@ impl DownloadError {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Copy, Clone)]
pub(crate) enum NeedsDownload {
NotFound,
NotFile(std::fs::FileType),

View File

@@ -340,7 +340,7 @@ pub(crate) fn log_compaction_error(
} else {
match level {
Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
Level::ERROR => error!("Compaction failed: {err:#}"),
Level::ERROR => error!("Compaction failed: {err:?}"),
Level::INFO => info!("Compaction failed: {err:#}"),
level => unimplemented!("unexpected level {level:?}"),
}

View File

@@ -987,6 +987,16 @@ impl From<PageReconstructError> for CreateImageLayersError {
}
}
impl From<super::storage_layer::errors::PutError> for CreateImageLayersError {
fn from(e: super::storage_layer::errors::PutError) -> Self {
if e.is_cancel() {
CreateImageLayersError::Cancelled
} else {
CreateImageLayersError::Other(e.into_anyhow())
}
}
}
impl From<GetVectoredError> for CreateImageLayersError {
fn from(e: GetVectoredError) -> Self {
match e {
@@ -2117,22 +2127,14 @@ impl Timeline {
debug_assert_current_span_has_tenant_and_timeline_id();
// Regardless of whether we're going to try_freeze_and_flush
// or not, stop ingesting any more data. Walreceiver only provides
// cancellation but no "wait until gone", because it uses the Timeline::gate.
// So, only after the self.gate.close() below will we know for sure that
// no walreceiver tasks are left.
// For `try_freeze_and_flush=true`, this means that we might still be ingesting
// data during the call to `self.freeze_and_flush()` below.
// That's not ideal, but, we don't have the concept of a ChildGuard,
// which is what we'd need to properly model early shutdown of the walreceiver
// task sub-tree before the other Timeline task sub-trees.
// or not, stop ingesting any more data.
let walreceiver = self.walreceiver.lock().unwrap().take();
tracing::debug!(
is_some = walreceiver.is_some(),
"Waiting for WalReceiverManager..."
);
if let Some(walreceiver) = walreceiver {
walreceiver.cancel();
walreceiver.shutdown().await;
}
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();
@@ -5923,6 +5925,16 @@ impl From<layer_manager::Shutdown> for CompactionError {
}
}
impl From<super::storage_layer::errors::PutError> for CompactionError {
fn from(e: super::storage_layer::errors::PutError) -> Self {
if e.is_cancel() {
CompactionError::ShuttingDown
} else {
CompactionError::Other(e.into_anyhow())
}
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);

View File

@@ -2204,8 +2204,7 @@ impl Timeline {
.as_mut()
.unwrap()
.put_value(key, lsn, value, ctx)
.await
.map_err(CompactionError::Other)?;
.await?;
} else {
let owner = self.shard_identity.get_shard_number(&key);

View File

@@ -149,14 +149,7 @@ pub async fn doit(
}
.await?;
flow::run(
timeline.clone(),
base_lsn,
control_file,
storage.clone(),
ctx,
)
.await?;
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
//
// Communicate that shard is done.

View File

@@ -34,7 +34,9 @@ use std::sync::Arc;
use anyhow::{bail, ensure};
use bytes::Bytes;
use futures::stream::FuturesOrdered;
use itertools::Itertools;
use pageserver_api::config::TimelineImportConfig;
use pageserver_api::key::{
CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
@@ -46,8 +48,9 @@ use pageserver_api::shard::ShardIdentity;
use postgres_ffi::relfile_utils::parse_relfilename;
use postgres_ffi::{BLCKSZ, pg_constants};
use remote_storage::RemotePath;
use tokio::task::JoinSet;
use tracing::{Instrument, debug, info_span, instrument};
use tokio::sync::Semaphore;
use tokio_stream::StreamExt;
use tracing::{debug, instrument};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
@@ -63,37 +66,39 @@ use crate::tenant::storage_layer::{ImageLayerWriter, Layer};
pub async fn run(
timeline: Arc<Timeline>,
pgdata_lsn: Lsn,
control_file: ControlFile,
storage: RemoteStorageWrapper,
ctx: &RequestContext,
) -> anyhow::Result<()> {
Flow {
timeline,
pgdata_lsn,
let planner = Planner {
control_file,
tasks: Vec::new(),
storage,
}
.run(ctx)
.await
storage: storage.clone(),
shard: timeline.shard_identity,
tasks: Vec::default(),
};
let import_config = &timeline.conf.timeline_import_config;
let plan = planner.plan(import_config).await?;
plan.execute(timeline, import_config, ctx).await
}
struct Flow {
timeline: Arc<Timeline>,
pgdata_lsn: Lsn,
struct Planner {
control_file: ControlFile,
tasks: Vec<AnyImportTask>,
storage: RemoteStorageWrapper,
shard: ShardIdentity,
tasks: Vec<AnyImportTask>,
}
impl Flow {
/// Perform the ingestion into [`Self::timeline`].
/// Assumes the timeline is empty (= no layers).
pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
struct Plan {
jobs: Vec<ChunkProcessingJob>,
}
self.pgdata_lsn = pgdata_lsn;
impl Planner {
/// Creates an import plan
///
/// This function is and must remain pure: given the same input, it will generate the same import plan.
async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
let datadir = PgDataDir::new(&self.storage).await?;
@@ -115,7 +120,7 @@ impl Flow {
}
// Import SLRUs
if self.timeline.tenant_shard_id.is_shard_zero() {
if self.shard.is_shard_zero() {
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
.await?;
@@ -166,14 +171,16 @@ impl Flow {
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
let mut current_chunk_size: usize = 0;
let mut parallel_jobs = Vec::new();
let mut jobs = Vec::new();
for task in std::mem::take(&mut self.tasks).into_iter() {
if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
if current_chunk_size + task.total_size()
> import_config.import_job_soft_size_limit.into()
{
let key_range = last_end_key..task.key_range().start;
parallel_jobs.push(ChunkProcessingJob::new(
jobs.push(ChunkProcessingJob::new(
key_range.clone(),
std::mem::take(&mut current_chunk),
&self,
pgdata_lsn,
));
last_end_key = key_range.end;
current_chunk_size = 0;
@@ -181,45 +188,13 @@ impl Flow {
current_chunk_size += task.total_size();
current_chunk.push(task);
}
parallel_jobs.push(ChunkProcessingJob::new(
jobs.push(ChunkProcessingJob::new(
last_end_key..Key::MAX,
current_chunk,
&self,
pgdata_lsn,
));
// Start all jobs simultaneosly
let mut work = JoinSet::new();
// TODO: semaphore?
for job in parallel_jobs {
let ctx: RequestContext =
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
}
let mut results = Vec::new();
while let Some(result) = work.join_next().await {
match result {
Ok(res) => {
results.push(res);
}
Err(_joinset_err) => {
results.push(Err(anyhow::anyhow!(
"parallel job panicked or cancelled, check pageserver logs"
)));
}
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let mut msg = String::new();
for result in results {
if let Err(err) = result {
msg.push_str(&format!("{err:?}\n\n"));
}
}
bail!("Some parallel jobs failed:\n\n{msg}");
}
Ok(Plan { jobs })
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
@@ -266,7 +241,7 @@ impl Flow {
let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
self.tasks
.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
*self.timeline.get_shard_identity(),
self.shard,
start_key..end_key,
&file.path,
self.storage.clone(),
@@ -289,7 +264,7 @@ impl Flow {
}
async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
assert!(self.timeline.tenant_shard_id.is_shard_zero());
assert!(self.shard.is_shard_zero());
let segments = self.storage.listfilesindir(path).await?;
let segments: Vec<(String, u32, usize)> = segments
@@ -344,6 +319,68 @@ impl Flow {
}
}
impl Plan {
async fn execute(
self,
timeline: Arc<Timeline>,
import_config: &TimelineImportConfig,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut work = FuturesOrdered::new();
let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
let jobs_in_plan = self.jobs.len();
let mut jobs = self.jobs.into_iter().enumerate().peekable();
let mut results = Vec::new();
// Run import jobs concurrently up to the limit specified by the pageserver configuration.
// Note that we process completed futures in the oreder of insertion. This will be the
// building block for resuming imports across pageserver restarts or tenant migrations.
while results.len() < jobs_in_plan {
tokio::select! {
permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
let permit = permit.expect("never closed");
let (job_idx, job) = jobs.next().expect("we peeked");
let job_timeline = timeline.clone();
let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
work.push_back(tokio::task::spawn(async move {
let _permit = permit;
let res = job.run(job_timeline, &ctx).await;
(job_idx, res)
}));
},
maybe_complete_job_idx = work.next() => {
match maybe_complete_job_idx {
Some(Ok((_job_idx, res))) => {
results.push(res);
},
Some(Err(_)) => {
results.push(Err(anyhow::anyhow!(
"parallel job panicked or cancelled, check pageserver logs"
)));
}
None => {}
}
}
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let mut msg = String::new();
for result in results {
if let Err(err) = result {
msg.push_str(&format!("{err:?}\n\n"));
}
}
bail!("Some parallel jobs failed:\n\n{msg}");
}
}
}
//
// dbdir iteration tools
//
@@ -713,7 +750,6 @@ impl From<ImportSlruBlocksTask> for AnyImportTask {
}
struct ChunkProcessingJob {
timeline: Arc<Timeline>,
range: Range<Key>,
tasks: Vec<AnyImportTask>,
@@ -721,25 +757,24 @@ struct ChunkProcessingJob {
}
impl ChunkProcessingJob {
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
assert!(env.pgdata_lsn.is_valid());
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
assert!(pgdata_lsn.is_valid());
Self {
timeline: env.timeline.clone(),
range,
tasks,
pgdata_lsn: env.pgdata_lsn,
pgdata_lsn,
}
}
async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
async fn run(self, timeline: Arc<Timeline>, ctx: &RequestContext) -> anyhow::Result<()> {
let mut writer = ImageLayerWriter::new(
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
timeline.conf,
timeline.timeline_id,
timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
&timeline.gate,
timeline.cancel.clone(),
ctx,
)
.await?;
@@ -751,24 +786,20 @@ impl ChunkProcessingJob {
let resident_layer = if nimages > 0 {
let (desc, path) = writer.finish(ctx).await?;
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
} else {
// dropping the writer cleans up
return Ok(());
};
// this is sharing the same code as create_image_layers
let mut guard = self.timeline.layers.write().await;
let mut guard = timeline.layers.write().await;
guard
.open_mut()?
.track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
.track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
crate::tenant::timeline::drop_wlock(guard);
// Schedule the layer for upload but don't add barriers such as
// wait for completion or index upload, so we don't inhibit upload parallelism.
// TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
// TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
self.timeline
timeline
.remote_client
.schedule_layer_file_upload(resident_layer)?;

View File

@@ -63,6 +63,7 @@ pub struct WalReceiver {
/// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
/// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
cancel: CancellationToken,
task: tokio::task::JoinHandle<()>,
}
impl WalReceiver {
@@ -79,7 +80,7 @@ impl WalReceiver {
let loop_status = Arc::new(std::sync::RwLock::new(None));
let manager_status = Arc::clone(&loop_status);
let cancel = timeline.cancel.child_token();
WALRECEIVER_RUNTIME.spawn({
let task = WALRECEIVER_RUNTIME.spawn({
let cancel = cancel.clone();
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -120,14 +121,25 @@ impl WalReceiver {
Self {
manager_status,
cancel,
task,
}
}
#[instrument(skip_all, level = tracing::Level::DEBUG)]
pub fn cancel(&self) {
pub async fn shutdown(self) {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("cancelling walreceiver tasks");
self.cancel.cancel();
match self.task.await {
Ok(()) => debug!("Shutdown success"),
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// already logged by panic hook
}
Err(je) => {
error!("shutdown walreceiver task join error: {je}")
}
}
}
pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {

View File

@@ -111,13 +111,17 @@ pub(crate) fn get() -> IoEngine {
use std::os::unix::prelude::FileExt;
use std::sync::atomic::{AtomicU8, Ordering};
#[cfg(target_os = "linux")]
use {std::time::Duration, tracing::info};
use super::owned_buffers_io::io_buf_ext::FullSlice;
use super::owned_buffers_io::slice::SliceMutExt;
use super::{FileGuard, Metadata};
#[cfg(target_os = "linux")]
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
pub(super) fn epoll_uring_error_to_std(
e: tokio_epoll_uring::Error<std::io::Error>,
) -> std::io::Error {
match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
@@ -149,7 +153,11 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, slice).await;
let (resources, res) =
retry_ecanceled_once((file_guard, slice), |(file_guard, slice)| async {
system.read(file_guard, offset, slice).await
})
.await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -164,7 +172,10 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fsync(file_guard).await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.fsync(file_guard).await
})
.await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -182,7 +193,10 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fdatasync(file_guard).await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.fdatasync(file_guard).await
})
.await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -201,7 +215,10 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.statx(file_guard).await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.statx(file_guard).await
})
.await;
(
resources,
res.map_err(epoll_uring_error_to_std).map(Metadata::from),
@@ -224,6 +241,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
// TODO: ftruncate op for tokio-epoll-uring
// Don't forget to use retry_ecanceled_once
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
}
@@ -245,8 +263,11 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let ((file_guard, slice), res) =
system.write(file_guard, offset, buf.into_raw_slice()).await;
let ((file_guard, slice), res) = retry_ecanceled_once(
(file_guard, buf.into_raw_slice()),
async |(file_guard, buf)| system.write(file_guard, offset, buf).await,
)
.await;
(
(file_guard, FullSlice::must_new(slice)),
res.map_err(epoll_uring_error_to_std),
@@ -282,6 +303,56 @@ impl IoEngine {
}
}
/// We observe in tests that stop pageserver with SIGTERM immediately after it was ingesting data,
/// occasionally buffered writers fail (and get retried by BufferedWriter) with ECANCELED.
/// The problem is believed to be a race condition in how io_uring handles punted async work (io-wq) and signals.
/// Investigation ticket: <https://github.com/neondatabase/neon/issues/11446>
///
/// This function retries the operation once if it fails with ECANCELED.
/// ONLY USE FOR IDEMPOTENT [`super::VirtualFile`] operations.
#[cfg(target_os = "linux")]
pub(super) async fn retry_ecanceled_once<F, Fut, T, V>(
resources: T,
f: F,
) -> (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)
where
F: Fn(T) -> Fut,
Fut: std::future::Future<Output = (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)>,
T: Send,
V: Send,
{
let (resources, res) = f(resources).await;
let Err(e) = res else {
return (resources, res);
};
let tokio_epoll_uring::Error::Op(err) = e else {
return (resources, Err(e));
};
if err.raw_os_error() != Some(nix::libc::ECANCELED) {
return (resources, Err(tokio_epoll_uring::Error::Op(err)));
}
{
static RATE_LIMIT: std::sync::Mutex<utils::rate_limit::RateLimit> =
std::sync::Mutex::new(utils::rate_limit::RateLimit::new(Duration::from_secs(1)));
let mut guard = RATE_LIMIT.lock().unwrap();
guard.call2(|rate_limit_stats| {
info!(
%rate_limit_stats, "ECANCELED observed, assuming it is due to a signal being received by the submitting thread, retrying after a delay; this message is rate-limited"
);
});
drop(guard);
}
tokio::time::sleep(Duration::from_millis(100)).await; // something big enough to beat even heavily overcommitted CI runners
let (resources, res) = f(resources).await;
(resources, res)
}
pub(super) fn panic_operation_must_be_idempotent() {
panic!(
"unsupported; io_engine may retry operations internally and thus needs them to be idempotent (retry_ecanceled_once)"
)
}
pub enum FeatureTestResult {
PlatformPreferred(IoEngineKind),
Worse {

View File

@@ -110,18 +110,23 @@ impl OpenOptions {
self
}
/// Don't use, `O_APPEND` is not supported.
pub fn append(&mut self, _append: bool) {
super::io_engine::panic_operation_must_be_idempotent();
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match &self.inner {
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async {
let res = system.open(path, x).await;
((), res)
})
.await;
res.map_err(super::io_engine::epoll_uring_error_to_std)
}
}
}
@@ -140,6 +145,9 @@ impl OpenOptions {
}
pub fn custom_flags(mut self, flags: i32) -> Self {
if flags & nix::libc::O_APPEND != 0 {
super::io_engine::panic_operation_must_be_idempotent();
}
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.custom_flags(flags);

View File

@@ -247,6 +247,19 @@ pub enum FlushTaskError {
Cancelled,
}
impl FlushTaskError {
pub fn is_cancel(&self) -> bool {
match self {
FlushTaskError::Cancelled => true,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
FlushTaskError::Cancelled => anyhow::anyhow!(self),
}
}
}
impl<Buf, W> FlushBackgroundTask<Buf, W>
where
Buf: IoBufAligned + Send + Sync,

View File

@@ -12,9 +12,9 @@ use tracing::{debug, warn};
use crate::auth::password_hack::parse_endpoint_param;
use crate::context::RequestContext;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, SniKind};
use crate::metrics::{Metrics, SniGroup, SniKind};
use crate::proxy::NeonOptions;
use crate::serverless::SERVERLESS_DRIVER_SNI;
use crate::serverless::{AUTH_BROKER_SNI, SERVERLESS_DRIVER_SNI};
use crate::types::{EndpointId, RoleName};
#[derive(Debug, Error, PartialEq, Eq, Clone)]
@@ -65,7 +65,7 @@ pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<
if !common_names.contains(common_name) {
return None;
}
if subdomain == SERVERLESS_DRIVER_SNI {
if subdomain == SERVERLESS_DRIVER_SNI || subdomain == AUTH_BROKER_SNI {
return None;
}
Some(EndpointId::from(subdomain))
@@ -128,22 +128,23 @@ impl ComputeUserInfoMaybeEndpoint {
let metrics = Metrics::get();
debug!(%user, "credentials");
if sni.is_some() {
let protocol = ctx.protocol();
let kind = if sni.is_some() {
debug!("Connection with sni");
metrics.proxy.accepted_connections_by_sni.inc(SniKind::Sni);
SniKind::Sni
} else if endpoint.is_some() {
metrics
.proxy
.accepted_connections_by_sni
.inc(SniKind::NoSni);
debug!("Connection without sni");
SniKind::NoSni
} else {
metrics
.proxy
.accepted_connections_by_sni
.inc(SniKind::PasswordHack);
debug!("Connection with password hack");
}
SniKind::PasswordHack
};
metrics
.proxy
.accepted_connections_by_sni
.inc(SniGroup { protocol, kind });
let options = NeonOptions::parse_params(params);

View File

@@ -115,8 +115,8 @@ pub struct ProxyMetrics {
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
pub allowed_vpc_endpoint_ids: Histogram<10>,
/// Number of connections (per sni).
pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
/// Number of connections, by the method we used to determine the endpoint.
pub accepted_connections_by_sni: CounterVec<SniSet>,
/// Number of connection failures (per kind).
pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
@@ -342,11 +342,20 @@ pub enum LatencyExclusions {
ClientCplaneComputeRetry,
}
#[derive(LabelGroup)]
#[label(set = SniSet)]
pub struct SniGroup {
pub protocol: Protocol,
pub kind: SniKind,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "kind")]
pub enum SniKind {
/// Domain name based routing. SNI for libpq/websockets. Host for HTTP
Sni,
/// Metadata based routing. `options` for libpq/websockets. Header for HTTP
NoSni,
/// Metadata based routing, using the password field.
PasswordHack,
}

View File

@@ -56,6 +56,7 @@ use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";
pub async fn task_main(
config: &'static ProxyConfig,

View File

@@ -38,7 +38,7 @@ use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::http::{ReadBodyError, read_body_with_limit};
use crate::metrics::{HttpDirection, Metrics};
use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind};
use crate::proxy::{NeonOptions, run_until_cancelled};
use crate::serverless::backend::HttpConnError;
use crate::types::{DbName, RoleName};
@@ -227,6 +227,32 @@ fn get_conn_info(
}
}
// check the URL that was used, for metrics
{
let host_endpoint = headers
// get the host header
.get("host")
// extract the domain
.and_then(|h| {
let (host, _port) = h.to_str().ok()?.split_once(':')?;
Some(host)
})
// get the endpoint prefix
.map(|h| h.split_once('.').map_or(h, |(prefix, _)| prefix));
let kind = if host_endpoint == Some(&*endpoint) {
SniKind::Sni
} else {
SniKind::NoSni
};
let protocol = ctx.protocol();
Metrics::get()
.proxy
.accepted_connections_by_sni
.inc(SniGroup { protocol, kind });
}
ctx.set_user_agent(
headers
.get(hyper::header::USER_AGENT)

View File

@@ -121,6 +121,20 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn switch_timeline_membership(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &models::TimelineMembershipSwitchRequest,
) -> Result<models::TimelineMembershipSwitchResponse> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/membership",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
let resp = self.put(&uri, req).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self

View File

@@ -243,8 +243,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let resp =
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
.await
.map_err(ApiError::InternalServerError)?;
.await?;
json_response(StatusCode::OK, resp)
}

View File

@@ -7,6 +7,7 @@ use bytes::Bytes;
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use reqwest::Certificate;
use safekeeper_api::Term;
@@ -30,7 +31,7 @@ use utils::pausable_failpoint;
use crate::control_file::CONTROL_FILE_NAME;
use crate::state::{EvictionState, TimelinePersistentState};
use crate::timeline::{Timeline, WalResidentTimeline};
use crate::timeline::{Timeline, TimelineError, WalResidentTimeline};
use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
use crate::wal_storage::open_wal_file;
use crate::{GlobalTimelines, debug_dump, wal_backup};
@@ -395,7 +396,7 @@ pub async fn handle_request(
sk_auth_token: Option<SecretString>,
ssl_ca_certs: Vec<Certificate>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<PullTimelineResponse> {
) -> Result<PullTimelineResponse, ApiError> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
@@ -411,7 +412,9 @@ pub async fn handle_request(
for ssl_ca_cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client.build()?;
let http_client = http_client
.build()
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let http_hosts = request.http_hosts.clone();
@@ -443,10 +446,10 @@ pub async fn handle_request(
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
let min_required_successful = (http_hosts.len() - 1).max(1);
if statuses.len() < min_required_successful {
bail!(
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"only got {} successful status responses. required: {min_required_successful}",
statuses.len()
)
)));
}
// Find the most advanced safekeeper
@@ -465,7 +468,7 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
pull_timeline(
match pull_timeline(
status,
safekeeper_host,
sk_auth_token,
@@ -473,6 +476,21 @@ pub async fn handle_request(
global_timelines,
)
.await
{
Ok(resp) => Ok(resp),
Err(e) => {
match e.downcast_ref::<TimelineError>() {
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
safekeeper_host: None,
}),
Some(TimelineError::CreationInProgress(_)) => {
// We don't return success here because creation might still fail.
Err(ApiError::Conflict("Creation in progress".to_owned()))
}
_ => Err(ApiError::InternalServerError(e)),
}
}
}
}
async fn pull_timeline(

View File

@@ -98,6 +98,23 @@ impl SafekeeperClient {
)
}
#[allow(unused)]
pub(crate) async fn switch_timeline_membership(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &models::TimelineMembershipSwitchRequest,
) -> Result<models::TimelineMembershipSwitchResponse> {
measured_request!(
"switch_timeline_membership",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.switch_timeline_membership(tenant_id, timeline_id, req)
.await
)
}
pub(crate) async fn delete_tenant(
&self,
tenant_id: TenantId,

View File

@@ -8485,7 +8485,7 @@ impl Service {
// By default, live migrations are generous about the wait time for getting
// the secondary location up to speed. When draining, give up earlier in order
// to not stall the operation when a cold secondary is encountered.
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
@@ -8818,7 +8818,7 @@ impl Service {
node_id: NodeId,
cancel: CancellationToken,
) -> Result<(), OperationError> {
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)

View File

@@ -165,16 +165,17 @@ pub(crate) async fn branch_cleanup_and_check_errors(
.head_object(&path, &CancellationToken::new())
.await;
if response.is_err() {
if let Err(e) = response {
// Object is not present.
let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
let msg = format!(
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {}) with error: {}",
layer,
metadata.generation.get_suffix(),
metadata.shard,
is_l0,
e,
);
if is_l0 || ignore_error {

View File

@@ -137,11 +137,10 @@ struct TenantRefAccumulator {
impl TenantRefAccumulator {
fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
let this_shard_idx = ttid.tenant_shard_id.to_index();
(*self
.shards_seen
self.shards_seen
.entry(ttid.tenant_shard_id.tenant_id)
.or_default())
.insert(this_shard_idx);
.or_default()
.insert(this_shard_idx);
let mut ancestor_refs = Vec::new();
for (layer_name, layer_metadata) in &index_part.layer_metadata {
@@ -767,10 +766,13 @@ pub async fn pageserver_physical_gc(
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?,
);
Ok(try_stream! {
let mut cnt = 0;
while let Some(ttid_res) = timelines.next().await {
let ttid = ttid_res?;
cnt += 1;
yield (ttid, tenant_manifest_arc.clone());
}
tracing::info!(%tenant_shard_id, "Found {} timelines", cnt);
})
}
});
@@ -790,6 +792,7 @@ pub async fn pageserver_physical_gc(
&accumulator,
tenant_manifest_arc,
)
.instrument(info_span!("gc_timeline", %ttid))
});
let timelines = timelines.try_buffered(CONCURRENCY);
let mut timelines = std::pin::pin!(timelines);

View File

@@ -153,7 +153,10 @@ pub async fn scan_pageserver_metadata(
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = tenants.map_ok(|t| {
tracing::info!("Found tenant: {}", t);
stream_tenant_timelines(&remote_client, &target, t)
});
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();

View File

@@ -24,7 +24,6 @@ pub struct SnapshotDownloader {
remote_client: GenericRemoteStorage,
#[allow(dead_code)]
target: RootTarget,
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
@@ -43,7 +42,6 @@ impl SnapshotDownloader {
Ok(Self {
remote_client,
target,
bucket_config,
tenant_id,
output_path,
concurrency,
@@ -218,11 +216,9 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (remote_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
let shards =
stream_tenant_shards(&self.remote_client, &self.target, self.tenant_id).await?;
let shards: Vec<TenantShardId> = shards.try_collect().await?;
// Only read from shards that have the highest count: avoids redundantly downloading
@@ -240,7 +236,8 @@ impl SnapshotDownloader {
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
// Generate a stream of TenantTimelineId
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
let timelines =
stream_tenant_timelines(&self.remote_client, &self.target, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
@@ -251,8 +248,8 @@ impl SnapshotDownloader {
let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines =
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
let timelines = timelines
.map_ok(|ttid| load_timeline_index(&self.remote_client, &self.target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {

View File

@@ -557,7 +557,7 @@ class NeonLocalCli(AbstractNeonCli):
endpoint_id: str,
safekeepers_generation: int | None = None,
safekeepers: list[int] | None = None,
remote_ext_config: str | None = None,
remote_ext_base_url: str | None = None,
pageserver_id: int | None = None,
allow_multiple: bool = False,
create_test_user: bool = False,
@@ -572,8 +572,8 @@ class NeonLocalCli(AbstractNeonCli):
extra_env_vars = env or {}
if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
if remote_ext_config is not None:
args.extend(["--remote-ext-config", remote_ext_config])
if remote_ext_base_url is not None:
args.extend(["--remote-ext-base-url", remote_ext_base_url])
if safekeepers_generation is not None:
args.extend(["--safekeepers-generation", str(safekeepers_generation)])

View File

@@ -1299,13 +1299,6 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
if self.pageserver_virtual_file_io_mode is not None:
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
if not config.test_may_use_compatibility_snapshot_binaries:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
else:
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
if self.pageserver_wal_receiver_protocol is not None:
key, value = PageserverWalReceiverProtocol.to_config_key_value(
self.pageserver_wal_receiver_protocol
@@ -1409,30 +1402,6 @@ class NeonEnv:
for f in futs:
f.result()
# Last step: register safekeepers at the storage controller
if (
self.storage_controller_config is not None
and self.storage_controller_config.get("timelines_onto_safekeepers") is True
):
for sk_id, sk in enumerate(self.safekeepers):
# 0 is an invalid safekeeper id
sk_id = sk_id + 1
body = {
"id": sk_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "127.0.0.1",
"port": sk.port.pg,
"http_port": sk.port.http,
"https_port": None,
"version": 5957,
"availability_zone_id": f"us-east-2b-{sk_id}",
}
self.storage_controller.on_safekeeper_deploy(sk_id, body)
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
@@ -3866,7 +3835,7 @@ class NeonAuthBroker:
external_http_port: int,
auth_backend: NeonAuthBroker.ProxyV1,
):
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
self.domain = "local.neon.build" # resolves to 127.0.0.1
self.host = "127.0.0.1"
self.http_port = http_port
self.external_http_port = external_http_port
@@ -3883,7 +3852,7 @@ class NeonAuthBroker:
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
generate_proxy_tls_certs(f"apiauth.{self.domain}", key_path, crt_path)
args = [
str(self.neon_binpath / "proxy"),
@@ -3927,10 +3896,10 @@ class NeonAuthBroker:
log.info(f"Executing http query: {query}")
connstr = f"postgresql://{user}@{self.domain}/postgres"
connstr = f"postgresql://{user}@ep-foo-bar-1234.{self.domain}/postgres"
async with httpx.AsyncClient(verify=str(self.test_output_dir / "proxy.crt")) as client:
response = await client.post(
f"https://{self.domain}:{self.external_http_port}/sql",
f"https://apiauth.{self.domain}:{self.external_http_port}/sql",
json={"query": query, "params": args},
headers={
"Neon-Connection-String": connstr,
@@ -4226,7 +4195,7 @@ class Endpoint(PgProtocol, LogUtils):
def start(
self,
remote_ext_config: str | None = None,
remote_ext_base_url: str | None = None,
pageserver_id: int | None = None,
safekeeper_generation: int | None = None,
safekeepers: list[int] | None = None,
@@ -4252,7 +4221,7 @@ class Endpoint(PgProtocol, LogUtils):
self.endpoint_id,
safekeepers_generation=safekeeper_generation,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
remote_ext_base_url=remote_ext_base_url,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
create_test_user=create_test_user,
@@ -4467,7 +4436,7 @@ class Endpoint(PgProtocol, LogUtils):
hot_standby: bool = False,
lsn: Lsn | None = None,
config_lines: list[str] | None = None,
remote_ext_config: str | None = None,
remote_ext_base_url: str | None = None,
pageserver_id: int | None = None,
allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
@@ -4486,7 +4455,7 @@ class Endpoint(PgProtocol, LogUtils):
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
).start(
remote_ext_config=remote_ext_config,
remote_ext_base_url=remote_ext_base_url,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
basebackup_request_tries=basebackup_request_tries,
@@ -4570,7 +4539,7 @@ class EndpointFactory:
lsn: Lsn | None = None,
hot_standby: bool = False,
config_lines: list[str] | None = None,
remote_ext_config: str | None = None,
remote_ext_base_url: str | None = None,
pageserver_id: int | None = None,
basebackup_request_tries: int | None = None,
) -> Endpoint:
@@ -4590,7 +4559,7 @@ class EndpointFactory:
hot_standby=hot_standby,
config_lines=config_lines,
lsn=lsn,
remote_ext_config=remote_ext_config,
remote_ext_base_url=remote_ext_base_url,
pageserver_id=pageserver_id,
basebackup_request_tries=basebackup_request_tries,
)
@@ -5477,6 +5446,13 @@ def wait_for_last_flush_lsn(
if last_flush_lsn is None:
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
# The last_flush_lsn may not correspond to a record boundary.
# For example, if the compute flushed WAL on a page boundary,
# the remaining part of the record might not be flushed for a long time.
# This would prevent the pageserver from reaching last_flush_lsn promptly.
# To ensure the rest of the record reaches the pageserver quickly,
# we forcibly flush the WAL by using CHECKPOINT.
endpoint.safe_psql("CHECKPOINT")
results = []
for tenant_shard_id, pageserver in shards:

View File

@@ -122,6 +122,10 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*Call to node.*management API.*failed.*Timeout.*",
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
# Many tests will take safekeepers offline
".*Call to safekeeper.*management API.*failed.*receive body.*",
".*Call to safekeeper.*management API.*failed.*ReceiveBody.*",
".*Call to safekeeper.*management API.*failed.*Timeout.*",
# Many tests will start up with a node offline
".*startup_reconcile: Could not scan node.*",
# Tests run in dev mode

View File

@@ -544,3 +544,69 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
)
role = cursor.fetchone()
assert role is None
def test_db_with_custom_settings(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can work with databases that have some custom settings.
For example, role=some_other_role, default_transaction_read_only=on,
search_path=non_public_schema, statement_timeout=1 (1ms).
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
TEST_ROLE = "some_other_role"
TEST_DB = "db_with_custom_settings"
TEST_SCHEMA = "non_public_schema"
endpoint.respec_deep(
**{
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"databases": [
{
"name": TEST_DB,
"owner": TEST_ROLE,
}
],
"roles": [
{
"name": TEST_ROLE,
}
],
},
}
}
)
endpoint.reconfigure()
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute(f"CREATE SCHEMA {TEST_SCHEMA}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET role = {TEST_ROLE}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET default_transaction_read_only = on")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET search_path = {TEST_SCHEMA}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET statement_timeout = 1")
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute("SELECT current_role")
role = cursor.fetchone()
assert role is not None
assert role[0] == TEST_ROLE
cursor.execute("SHOW default_transaction_read_only")
default_transaction_read_only = cursor.fetchone()
assert default_transaction_read_only is not None
assert default_transaction_read_only[0] == "on"
cursor.execute("SHOW search_path")
search_path = cursor.fetchone()
assert search_path is not None
assert search_path[0] == TEST_SCHEMA
# Do not check statement_timeout, because we force it to 2min
# in `endpoint.cursor()` fixture.
endpoint.reconfigure()

View File

@@ -221,7 +221,7 @@ def test_remote_extensions(
endpoint.create_remote_extension_spec(spec)
endpoint.start(remote_ext_config=extensions_endpoint)
endpoint.start(remote_ext_base_url=extensions_endpoint)
with endpoint.connect() as conn:
with conn.cursor() as cur:
@@ -249,7 +249,7 @@ def test_remote_extensions(
# Remove the extension files to force a redownload of the extension.
extension.remove(test_output_dir, pg_version)
endpoint.start(remote_ext_config=extensions_endpoint)
endpoint.start(remote_ext_base_url=extensions_endpoint)
# Test that ALTER EXTENSION UPDATE statements also fetch remote extensions.
with endpoint.connect() as conn:

View File

@@ -641,6 +641,55 @@ def test_fast_import_binary(
assert res[0][0] == 10
def test_fast_import_event_triggers(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
):
vanilla_pg.start()
vanilla_pg.safe_psql("""
CREATE FUNCTION test_event_trigger_for_drops()
RETURNS event_trigger LANGUAGE plpgsql AS $$
DECLARE
obj record;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects()
LOOP
RAISE NOTICE '% dropped object: % %.% %',
tg_tag,
obj.object_type,
obj.schema_name,
obj.object_name,
obj.object_identity;
END LOOP;
END
$$;
CREATE EVENT TRIGGER test_event_trigger_for_drops
ON sql_drop
EXECUTE PROCEDURE test_event_trigger_for_drops();
""")
pg_port = port_distributor.get_port()
p = fast_import.run_pgdata(pg_port=pg_port, source_connection_string=vanilla_pg.connstr())
assert p.returncode == 0
vanilla_pg.stop()
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
with VanillaPostgres(
fast_import.workdir / "pgdata", pgbin, pg_port, False
) as new_pgdata_vanilla_pg:
new_pgdata_vanilla_pg.start()
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
res = conn.safe_psql("SELECT count(*) FROM pg_event_trigger;")
log.info(f"Result: {res}")
assert res[0][0] == 0, f"Neon does not support importing event triggers, got: {res[0][0]}"
def test_fast_import_restore_to_connstring(
test_output_dir,
vanilla_pg: VanillaPostgres,

View File

@@ -1822,7 +1822,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
endpoint2.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_restore', 'pgoutput')"
)
lsn3 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
lsn3 = wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
["pg_replslot/test_slot_restore/state"]
@@ -1839,7 +1839,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
assert all_reparented == set([])
# We need to ensure all safekeeper data are ingested before checking aux files: the API does not wait for LSN.
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
), "main branch unaffected"

View File

@@ -1,7 +1,7 @@
{
"v17": [
"17.4",
"eab3a37834cac6ec0719bf817ac918a201712d66"
"b763ab54b98d232a0959371ab1d07f06ed77c49e"
],
"v16": [
"16.8",
@@ -13,6 +13,6 @@
],
"v14": [
"14.17",
"c8dab02bfc003ae7bd59096919042d7840f3c194"
"108856a4ae76be285b04497a0ed08fcbe60ddbe9"
]
}