Compare commits

..

34 Commits

Author SHA1 Message Date
Heikki Linnakangas
c47ba5f893 Support read-only computes in 'compute_ctl' 2023-04-14 13:15:28 +03:00
dependabot[bot]
b6c7c3290f Bump h2 from 0.3.15 to 0.3.17 (#4020) 2023-04-13 20:03:24 +01:00
Sasha Krassovsky
fd31fafeee Make proxy shutdown when all connections are closed (#3764)
## Describe your changes
Makes Proxy start draining connections on SIGTERM.
## Issue ticket number and link
#3333
2023-04-13 19:31:30 +03:00
Alexey Kondratov
db8dd6f380 [compute_ctl] Implement live reconfiguration (#3980)
With this commit one can request compute reconfiguration
from the running `compute_ctl` with compute in `Running` state
by sending a new spec:
```shell
curl -d "{\"spec\": $(cat ./compute-spec-new.json)}" http://localhost:3080/configure
```

Internally, we start a separate configurator thread that is waiting on
`Condvar` for `ConfigurationPending` compute state in a loop. Then it does
reconfiguration, sets compute back to `Running` state and notifies other
waiters.

It will need some follow-ups, e.g. for retry logic for control-plane
requests, but should be useful for testing in the current state. This
shouldn't affect any existing environment, since computes are configured
in a different way there.

Resolves neondatabase/cloud#4433
2023-04-13 18:07:29 +02:00
Alexander Bayandin
36c20946b4 Verify extensions checksums (#4014)
To not be taken by surprise by upstream git re-tag or by malicious activity,
let's verify the checksum for extensions we download

Also, unify the installation of `pg_graphql` and `pg_tiktoken` 
with other extensions.
2023-04-13 15:25:09 +01:00
Heikki Linnakangas
89b5589b1b Tenant size should never be zero. Simplify test.
Looking at the git history of this test, I think "size == 0" used to
have a special meaning earlier, but now it should never happen.
2023-04-13 16:57:31 +03:00
Heikki Linnakangas
53f438a8a8 Rename "Postgres nodes" in control_plane to endpoints.
We use the term "endpoint" in for compute Postgres nodes in the web UI
and user-facing documentation now. Adjust the nomenclature in the code.

This changes the name of the "neon_local pg" command to "neon_local
endpoint". Also adjust names of classes, variables etc. in the python
tests accordingly.

This also changes the directory structure so that endpoints are now
stored in:

    .neon/endpoints/<endpoint id>

instead of:

    .neon/pgdatadirs/tenants/<tenant_id>/<endpoint (node) name>

The tenant ID is no longer part of the path. That means that you
cannot have two endpoints with the same name/ID in two different
tenants anymore. That's consistent with how we treat endpoints in the
real control plane and proxy: the endpoint ID must be globally unique.
2023-04-13 14:34:29 +03:00
Vadim Kharitonov
356439aa33 Add note about manual_release_instructions label (#4015)
## Describe your changes
Do not forget to process required manual stuff after release

## Issue ticket number and link

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
2023-04-13 13:13:24 +03:00
Vadim Kharitonov
c237a2f5fb Compile pg_hint_plan extension 2023-04-13 12:59:46 +03:00
Dmitry Rodionov
15d1f85552 Add reason to TenantState::Broken (#3954)
Reason and backtrace are added to the Broken state. Backtrace is automatically collected when tenant entered the broken state. The format for API, CLI and metrics is changed and unified to return tenant state name in camel case. Previously snake case was used for metrics and camel case was used for everything else. Now tenant state field in TenantInfo swagger spec is changed to contain state name in "slug" field and other fields (currently only reason and backtrace for Broken variant in "data" field). To allow for this breaking change state was removed from TenantInfo swagger spec because it was not used anywhere.

Please note that the tenant's broken reason is not persisted on disk so the reason is lost when pageserver is restarted.

Requires changes to grafana dashboard that monitors tenant states.

Closes #3001

---------

Co-authored-by: theirix <theirix@gmail.com>
2023-04-13 12:11:43 +03:00
Konstantin Knizhnik
732acc54c1 Add check for duplicates of generated image layers (#3869)
## Describe your changes

## Issue ticket number and link

#3673

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-04-13 10:19:34 +03:00
Stas Kelvich
5d0ecadf7c Add support for non-SNI case in multi-cert proxy
When no SNI is provided use the default certificate, otherwise we can't
get to the options parameter which can be used to set endpoint name too.
That means that non-SNI flow will not work for CNAME domains in verify-full
mode.
2023-04-12 18:16:49 +03:00
Kirill Bulatov
f7995b3c70 Revert "Update most of the dependencies to their latest versions (#3991)" (#4013)
This reverts commit a64044a7a9.

See https://neondb.slack.com/archives/C03H1K0PGKH/p1681306682795559
2023-04-12 14:51:59 +00:00
Alexander Bayandin
13e53e5dc8 GitHub Workflows: use '!cancelled' instead of 'success or failure' 2023-04-12 15:22:18 +01:00
Alexander Bayandin
c94b8998be GitHub Workflows: print error messages to stderr 2023-04-12 15:22:18 +01:00
Alexander Bayandin
218062ceba GitHub Workflows: use ref_name instead of ref 2023-04-12 15:22:18 +01:00
Sam Gaw
8d295780cb Add support for ip4r extension 2023-04-12 16:40:02 +03:00
Kirill Bulatov
a64044a7a9 Update most of the dependencies to their latest versions (#3991)
All non-trivial updates extracted into separate commits, also `carho
hakari` data and its manifest format were updated.

3 sets of crates remain unupdated:

* `base64` — touches proxy in a lot of places and changed its api (by
0.21 version) quite strongly since our version (0.13).
* `opentelemetry` and `opentelemetry-*` crates

```
error[E0308]: mismatched types
  --> libs/tracing-utils/src/http.rs:65:21
   |
65 |     span.set_parent(parent_ctx);
   |          ---------- ^^^^^^^^^^ expected struct `opentelemetry_api::context::Context`, found struct `opentelemetry::Context`
   |          |
   |          arguments to this method are incorrect
   |
   = note: struct `opentelemetry::Context` and struct `opentelemetry_api::context::Context` have similar names, but are actually distinct types
note: struct `opentelemetry::Context` is defined in crate `opentelemetry_api`
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/opentelemetry_api-0.19.0/src/context.rs:77:1
   |
77 | pub struct Context {
   | ^^^^^^^^^^^^^^^^^^
note: struct `opentelemetry_api::context::Context` is defined in crate `opentelemetry_api`
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/opentelemetry_api-0.18.0/src/context.rs:77:1
   |
77 | pub struct Context {
   | ^^^^^^^^^^^^^^^^^^
   = note: perhaps two different versions of crate `opentelemetry_api` are being used?
note: associated function defined here
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-opentelemetry-0.18.0/src/span_ext.rs:43:8
   |
43 |     fn set_parent(&self, cx: Context);
   |        ^^^^^^^^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `tracing-utils` due to previous error
warning: build failed, waiting for other jobs to finish...
error: could not compile `tracing-utils` due to previous error
```

`tracing-opentelemetry` of version `0.19` is not yet released, that is
supposed to have the update we need.

* similarly, `rustls`, `tokio-rustls`, `rustls-*` and `tls-listener`
crates have similar issue:

```
error[E0308]: mismatched types
   --> libs/postgres_backend/tests/simple_select.rs:112:78
    |
112 |     let mut make_tls_connect = tokio_postgres_rustls::MakeRustlsConnect::new(client_cfg);
    |                                --------------------------------------------- ^^^^^^^^^^ expected struct `rustls::client::client_conn::ClientConfig`, found struct `ClientConfig`
    |                                |
    |                                arguments to this function are incorrect
    |
    = note: struct `ClientConfig` and struct `rustls::client::client_conn::ClientConfig` have similar names, but are actually distinct types
note: struct `ClientConfig` is defined in crate `rustls`
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/rustls-0.21.0/src/client/client_conn.rs:125:1
    |
125 | pub struct ClientConfig {
    | ^^^^^^^^^^^^^^^^^^^^^^^
note: struct `rustls::client::client_conn::ClientConfig` is defined in crate `rustls`
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/rustls-0.20.8/src/client/client_conn.rs:91:1
    |
91  | pub struct ClientConfig {
    | ^^^^^^^^^^^^^^^^^^^^^^^
    = note: perhaps two different versions of crate `rustls` are being used?
note: associated function defined here
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-postgres-rustls-0.9.0/src/lib.rs:23:12
    |
23  |     pub fn new(config: ClientConfig) -> Self {
    |            ^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `postgres_backend` due to previous error
warning: build failed, waiting for other jobs to finish...
```

* aws crates: I could not make new API to work with bucket endpoint
overload, and console e2e tests failed.
Other our tests passed, further investigation is worth to be done in
https://github.com/neondatabase/neon/issues/4008
2023-04-12 15:32:38 +03:00
Kirill Bulatov
d8939d4162 Move walreceiver start and stop behind a struct (#3973)
The PR changes module function-based walreceiver interface with a
`WalReceiver` struct that exposes a few public methods, `new`, `start`
and `stop` now.

Later, the same struct is planned to be used for getting walreceiver
stats (and, maybe, other extra data) to display during missing wal
errors for https://github.com/neondatabase/neon/issues/2106

Now though, the change required extra logic changes:

* due to the `WalReceiver` struct added, it became easier to pass `ctx`
and later do a `detached_child` instead of

bfee412701/pageserver/src/tenant/timeline.rs (L1379-L1381)

* `WalReceiver::start` which is now the public API to start the
walreceiver, could return an `Err` which now may turn a tenant into
`Broken`, same as the timeline that it tries to load during startup.

* `WalReceiverConf` was added to group walreceiver parameters from
pageserver's tenant config
2023-04-12 12:39:02 +03:00
Heikki Linnakangas
06ce83c912 Tolerate missing 'operation_uuid' field in spec file.
'compute_ctl' doesn't use the operation_uuid for anything, it just prints
it to the log.
2023-04-12 12:11:22 +03:00
Heikki Linnakangas
8ace7a7515 Remove unused 'timestamp' field from ComputeSpec struct. 2023-04-12 12:11:22 +03:00
Heikki Linnakangas
ef68321b31 Use Lsn, TenantId, TimelineId types in compute_ctl.
Stronger types are generally nicer.
2023-04-12 12:11:22 +03:00
Heikki Linnakangas
6064a26963 Refactor 'spec' in ComputeState.
Sometimes, it contained real values, sometimes just defaults if the
spec was not received yet. Make the state more clear by making it an
Option instead.

One consequence is that if some of the required settings like
neon.tenant_id are missing from the spec file sent to the /configure
endpoint, it is spotted earlier and you get an immediate HTTP error
response. Not that it matters very much, but it's nicer nevertheless.
2023-04-12 01:55:40 +03:00
Stas Kelvich
3c9f42a2e2 Support aarch64 in walredo seccomp code (#3996)
Aarch64 doesn't implement some old syscalls like open and select. Use
openat instead of open to check if seccomp is supported. Leave both
select and pselect6 in the allowlist since we don't call select syscall
directly and may hope that libc will call pselect6 on aarch64.

To check whether some syscall is supported it is possible to use
`scmp_sys_resolver` from seccopm package:

```
> apt install seccopm
> scmp_sys_resolver -a x86_64 select
23
> scmp_sys_resolver -a aarch64 select
-10101
> scmp_sys_resolver -a aarch64 pselect6
72
```

Negative value means that syscall is not supported.

Another cross-check is to look up for the actuall syscall table in
`unistd.h`. To resolve all the macroses one can use `gcc -E` as it is
done in `dump_sys_aarch64()` function in
libseccomp/src/arch-syscall-validate.

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-04-11 19:28:18 +00:00
Alexey Kondratov
40a68e9077 [compute_ctl] Add timeout for tracing_utils::shutdown_tracing() (#3982)
Shutting down OTEL tracing provider may hang for quite some time, see,
for example:
- https://github.com/open-telemetry/opentelemetry-rust/issues/868
- and our problems with staging
https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636

Yet, we want computes to shut down fast enough, as we may need a new one
for the same timeline ASAP. So wait no longer than 2s for the shutdown
to complete, then just error out and exit the main thread.

Related to neondatabase/cloud#3707
2023-04-11 15:05:35 +02:00
Stas Kelvich
de99ee2c0d Add more proxy cnames 2023-04-11 14:54:09 +03:00
Alexander Bayandin
c79d5a947c Nightly Benchmarks: run third-party benchmarks once a week (#3987) 2023-04-11 10:58:04 +01:00
Arseny Sher
7ad5a5e847 Enable timeout on reading from socket in safekeeper WAL service.
TCP_KEEPALIVE is not enabled by default, so this prevents hanged up connections
in case of abrupt client termination. Add 'closed' flag to PostgresBackendReader
and pass it during handles join to prevent attempts to read from socket if we
errored out previously -- now with timeouts this is a common situation.

It looks like
2023-04-10T18:08:37.493448Z  INFO {cid=68}:WAL
receiver{ttid=59f91ad4e821ab374f9ccdf918da3a85/16438f99d61572c72f0c7b0ed772785d}:
terminated: timed out

Presumably fixes https://github.com/neondatabase/neon/issues/3971
2023-04-11 11:45:43 +04:00
Stas Kelvich
22c890b71c Add more cnames to proxies 2023-04-11 01:55:25 +03:00
Stas Kelvich
83549a8d40 Revert "Support aarch64 in walredo seccomp code"
This reverts commit 98df7db094.
2023-04-11 00:08:01 +03:00
Stas Kelvich
98df7db094 Support aarch64 in walredo seccomp code
Aarch64 doesn't implement some old syscalls like open and select. Use
openat instead of open to check if seccomp is supported. Leave both
select and pselect6 in the allowlist since we don't call select syscall
directly and may hope that libc will call pselect6 on aarch64.

To check whether some syscall is supported it is possible to use
`scmp_sys_resolver` from seccopm package:

```
> apt install seccopm
> scmp_sys_resolver -a x86_64 select
23
> scmp_sys_resolver -a aarch64 select
-10101
> scmp_sys_resolver -a aarch64 pselect6
72
```

Negative value means that syscall is not supported.

Another cross-check is to look up for the actuall syscall table
in `unistd.h`. To resolve all the macroses one can use `gcc -E` as
it is done in `dump_sys_aarch64()` function in
libseccomp/src/arch-syscall-validate.
2023-04-10 23:54:16 +03:00
Heikki Linnakangas
f0b2e076d9 Move compute_ctl structs used in HTTP API and spec file to separate crate.
This is in preparation of using compute_ctl to launch postgres nodes
in the neon_local control plane. And seems like a good idea to
separate the public interfaces anyway.

One non-mechanical change here is that the 'metrics' field is moved
under the Mutex, instead of using atomics. We were not using atomics
for performance but for convenience here, and it seems more clear to
not use atomics in the model for the HTTP response type.
2023-04-09 21:52:28 +03:00
Alexander Bayandin
818e341af0 Nightly Benchmarks: replace neon-captest-prefetch with -new/-reuse (#3970)
We have enabled prefetch by default, let's use this in Nightly
Benchmarks:
- effective_io_concurrency=100 by default (instead of 32)
- maintenance_io_concurrency=100 by default (instead of 32)

Rename `neon-captest-prefetch` to `neon-captest-new` (for pgbench with
initialisation) and `neon-captest-reuse` (for OLAP scenarios)
2023-04-09 12:52:49 +01:00
Kirill Bulatov
dec58092e8 Replace Box<dyn> with impl in RemoteStorage upload (#3984)
Replaces `Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>` with
`impl io::AsyncRead + Unpin + Send + Sync + 'static` usages in the
`RemoteStorage` interface, to make it closer to
[`#![feature(async_fn_in_trait)]`](https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html)

For `GenericRemoteStorage`, replaces `type Target = dyn RemoteStorage`
with another impl with `RemoteStorage` methods inside it.
We can reuse the trait, that would require importing the trait in every
file where it's used and makes us farther from the unstable feature.
After this PR, I've manged to create a patch with the changes:

https://github.com/neondatabase/neon/compare/kb/less-dyn-storage...kb/nightly-async-trait?expand=1

Current rust implementation does not like recursive async trait calls,
so `UnreliableWrapper` was removed: it contained a
`GenericRemoteStorage` that implemented the `RemoteStorage` trait, and
itself implemented the trait, which nightly rustc did not like and
proposed to box the future.
Similarly, `GenericRemoteStorage` cannot implement `RemoteStorage` for
nightly rustc to work, since calls various remote storages' methods from
inside.

I've compiled current `main` and the nightly branch both with `time env
RUSTC_WRAPPER="" cargo +nightly build --all --timings` command, and got
```
    Finished dev [optimized + debuginfo] target(s) in 2m 04s
env RUSTC_WRAPPER="" cargo +nightly build --all --timings  1283.19s user 50.40s system 1074% cpu 2:04.15 total

for the new feature tried and

    Finished dev [optimized + debuginfo] target(s) in 2m 40s
env RUSTC_WRAPPER="" cargo +nightly build --all --timings  1288.59s user 52.06s system 834% cpu 2:40.71 total

for the old async_trait approach.
```

On my machine, the `remote_storage` lib compilation takes ~10 less time
with the nightly feature (left) than the regular main (right).


![image](https://user-images.githubusercontent.com/2690773/230620797-163d8b89-dac8-4366-bcf6-cd1cdddcd22c.png)

Full cargo reports are available at
[timings.zip](https://github.com/neondatabase/neon/files/11179369/timings.zip)
2023-04-07 21:39:49 +03:00
146 changed files with 2602 additions and 1864 deletions

View File

@@ -10,6 +10,7 @@
<!-- List everything that should be done **before** release, any issues / setting changes / etc -->
### Checklist after release
- [ ] Make sure instructions from PRs included in this release and labeled `manual_release_instructions` are executed (either by you or by people who wrote them).
- [ ] Based on the merged commits write release notes and open a PR into `website` repo ([example](https://github.com/neondatabase/website/pull/219/files))
- [ ] Check [#dev-production-stream](https://neondb.slack.com/archives/C03F5SM1N02) Slack channel
- [ ] Check [stuck projects page](https://console.neon.tech/admin/projects?sort=last_active&order=desc&stuck=true)

View File

@@ -45,12 +45,12 @@ runs:
shell: bash -euxo pipefail {0}
run: |
if [ "${{ inputs.action }}" != "store" ] && [ "${{ inputs.action }}" != "generate" ]; then
echo 2>&1 "Unknown inputs.action type '${{ inputs.action }}'; allowed 'generate' or 'store' only"
echo >&2 "Unknown inputs.action type '${{ inputs.action }}'; allowed 'generate' or 'store' only"
exit 1
fi
if [ -z "${{ inputs.test_selection }}" ] && [ "${{ inputs.action }}" == "store" ]; then
echo 2>&1 "inputs.test_selection must be set for 'store' action"
echo >&2 "inputs.test_selection must be set for 'store' action"
exit 2
fi

View File

@@ -37,7 +37,7 @@ runs:
echo 'SKIPPED=true' >> $GITHUB_OUTPUT
exit 0
else
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
echo >&2 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
fi

View File

@@ -58,7 +58,7 @@ runs:
done
if [ -z "${branch_id}" ] || [ "${branch_id}" == "null" ]; then
echo 2>&1 "Failed to create branch after 10 attempts, the latest response was: ${branch}"
echo >&2 "Failed to create branch after 10 attempts, the latest response was: ${branch}"
exit 1
fi
@@ -122,7 +122,7 @@ runs:
done
if [ -z "${password}" ] || [ "${password}" == "null" ]; then
echo 2>&1 "Failed to reset password after 10 attempts, the latest response was: ${reset_password}"
echo >&2 "Failed to reset password after 10 attempts, the latest response was: ${reset_password}"
exit 1
fi

View File

@@ -48,7 +48,7 @@ runs:
done
if [ -z "${branch_id}" ] || [ "${branch_id}" == "null" ]; then
echo 2>&1 "Failed to delete branch after 10 attempts, the latest response was: ${deleted_branch}"
echo >&2 "Failed to delete branch after 10 attempts, the latest response was: ${deleted_branch}"
exit 1
fi
env:

View File

@@ -202,7 +202,7 @@ runs:
prefix: latest
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: store

View File

@@ -23,7 +23,7 @@ runs:
mkdir -p $(dirname $ARCHIVE)
if [ -f ${ARCHIVE} ]; then
echo 2>&1 "File ${ARCHIVE} already exist. Something went wrong before"
echo >&2 "File ${ARCHIVE} already exist. Something went wrong before"
exit 1
fi
@@ -33,10 +33,10 @@ runs:
elif [ -f ${SOURCE} ]; then
time tar -cf ${ARCHIVE} --zstd ${SOURCE}
elif ! ls ${SOURCE} > /dev/null 2>&1; then
echo 2>&1 "${SOURCE} does not exist"
echo >&2 "${SOURCE} does not exist"
exit 2
else
echo 2>&1 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
echo >&2 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
exit 3
fi

View File

@@ -24,6 +24,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.ap-southeast-1.aws.neon.tech"
extraDomains: ["*.ap-southeast-1.retooldb.com", "*.ap-southeast-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -24,6 +24,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.eu-central-1.aws.neon.tech"
extraDomains: ["*.eu-central-1.retooldb.com", "*.eu-central-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -24,6 +24,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-2.aws.neon.tech"
extraDomains: ["*.us-east-2.retooldb.com", "*.us-east-2.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -24,6 +24,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-west-2.aws.neon.tech"
extraDomains: ["*.us-west-2.retooldb.com", "*.us-west-2.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -30,7 +30,7 @@ defaults:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:
@@ -42,7 +42,7 @@ jobs:
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -92,7 +92,7 @@ jobs:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
@@ -107,29 +107,65 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
generate-matrices:
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
#
# Available platforms:
# - neon-captest-new: Freshly created project (1 CU)
# - neon-captest-freetier: Use freetier-sized compute (0.25 CU)
# - neon-captest-reuse: Reusing existing project
# - rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# - rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
runs-on: ubuntu-latest
outputs:
pgbench-compare-matrix: ${{ steps.pgbench-compare-matrix.outputs.matrix }}
olap-compare-matrix: ${{ steps.olap-compare-matrix.outputs.matrix }}
steps:
- name: Generate matrix for pgbench benchmark
id: pgbench-compare-matrix
run: |
matrix='{
"platform": [
"neon-captest-new",
"neon-captest-reuse"
],
"db_size": [ "10gb" ],
"include": [
{ "platform": "neon-captest-freetier", "db_size": "3gb" },
{ "platform": "neon-captest-new", "db_size": "50gb" }
]
}'
if [ "$(date +%A)" = "Saturday" ]; then
matrix=$(echo $matrix | jq '.include += [{ "platform": "rds-postgres", "db_size": "10gb"},
{ "platform": "rds-aurora", "db_size": "50gb"}]')
fi
echo "matrix=$(echo $matrix | jq --compact-output '.')" >> $GITHUB_OUTPUT
- name: Generate matrix for OLAP benchmarks
id: olap-compare-matrix
run: |
matrix='{
"platform": [
"neon-captest-reuse"
]
}'
if [ "$(date +%A)" = "Saturday" ]; then
matrix=$(echo $matrix | jq '.include += [{ "platform": "rds-postgres" },
{ "platform": "rds-aurora" }]')
fi
echo "matrix=$(echo $matrix | jq --compact-output '.')" >> $GITHUB_OUTPUT
pgbench-compare:
needs: [ generate-matrices ]
strategy:
fail-fast: false
matrix:
# neon-captest-freetier: Run pgbench with freetier-limited compute
# neon-captest-new: Run pgbench in a freshly created project
# neon-captest-reuse: Same, but reusing existing project
# neon-captest-prefetch: Same, with prefetching enabled (new project)
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-reuse, neon-captest-prefetch, rds-postgres ]
db_size: [ 10gb ]
runner: [ us-east-2 ]
include:
- platform: neon-captest-freetier
db_size: 3gb
runner: us-east-2
- platform: neon-captest-prefetch
db_size: 50gb
runner: us-east-2
- platform: rds-aurora
db_size: 50gb
runner: us-east-2
matrix: ${{fromJson(needs.generate-matrices.outputs.pgbench-compare-matrix)}}
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
@@ -138,10 +174,10 @@ jobs:
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, "${{ matrix.runner }}", x64 ]
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
@@ -164,7 +200,7 @@ jobs:
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Create Neon Project
if: contains(fromJson('["neon-captest-new", "neon-captest-prefetch", "neon-captest-freetier"]'), matrix.platform)
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier"]'), matrix.platform)
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
@@ -180,7 +216,7 @@ jobs:
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
;;
neon-captest-new | neon-captest-prefetch | neon-captest-freetier)
neon-captest-new | neon-captest-freetier)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
@@ -190,7 +226,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-prefetch', neon-captest-freetier, 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -199,17 +235,6 @@ jobs:
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
DB_NAME=$(psql ${BENCHMARK_CONNSTR} --no-align --quiet -t -c "SELECT current_database()")
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET effective_io_concurrency=32"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET maintenance_io_concurrency=32"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:
@@ -257,7 +282,7 @@ jobs:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
@@ -280,23 +305,19 @@ jobs:
#
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
if: success() || failure()
needs: [ pgbench-compare ]
if: ${{ !cancelled() }}
needs: [ generate-matrices, pgbench-compare ]
strategy:
fail-fast: false
matrix:
# neon-captest-prefetch: We have pre-created projects with prefetch enabled
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-prefetch, rds-postgres, rds-aurora ]
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -325,7 +346,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-prefetch)
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR }}
;;
rds-aurora)
@@ -335,7 +356,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -344,17 +365,6 @@ jobs:
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
DB_NAME=$(psql ${BENCHMARK_CONNSTR} --no-align --quiet -t -c "SELECT current_database()")
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET effective_io_concurrency=32"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET maintenance_io_concurrency=32"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: ClickBench benchmark
uses: ./.github/actions/run-python-test-set
with:
@@ -369,7 +379,7 @@ jobs:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
@@ -391,23 +401,19 @@ jobs:
# We might change it after https://github.com/neondatabase/neon/issues/2900.
#
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
if: success() || failure()
needs: [ clickbench-compare ]
if: ${{ !cancelled() }}
needs: [ generate-matrices, clickbench-compare ]
strategy:
fail-fast: false
matrix:
# neon-captest-prefetch: We have pre-created projects with prefetch enabled
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-prefetch, rds-postgres, rds-aurora ]
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -436,7 +442,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-prefetch)
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_TPCH_S10_CONNSTR }}
;;
rds-aurora)
@@ -446,7 +452,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_TPCH_S10_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -455,17 +461,6 @@ jobs:
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
DB_NAME=$(psql ${BENCHMARK_CONNSTR} --no-align --quiet -t -c "SELECT current_database()")
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET effective_io_concurrency=32"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET maintenance_io_concurrency=32"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Run TPC-H benchmark
uses: ./.github/actions/run-python-test-set
with:
@@ -480,7 +475,7 @@ jobs:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
@@ -496,23 +491,19 @@ jobs:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
user-examples-compare:
if: success() || failure()
needs: [ tpch-compare ]
if: ${{ !cancelled() }}
needs: [ generate-matrices, tpch-compare ]
strategy:
fail-fast: false
matrix:
# neon-captest-prefetch: We have pre-created projects with prefetch enabled
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-prefetch, rds-postgres, rds-aurora ]
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -541,7 +532,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-prefetch)
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_CAPTEST_CONNSTR }}
;;
rds-aurora)
@@ -551,7 +542,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_POSTGRES_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -560,17 +551,6 @@ jobs:
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
DB_NAME=$(psql ${BENCHMARK_CONNSTR} --no-align --quiet -t -c "SELECT current_database()")
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET effective_io_concurrency=32"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET maintenance_io_concurrency=32"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Run user examples
uses: ./.github/actions/run-python-test-set
with:
@@ -585,7 +565,7 @@ jobs:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate

View File

@@ -13,7 +13,7 @@ defaults:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
@@ -368,7 +368,7 @@ jobs:
build_type: ${{ matrix.build_type }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ github.ref == 'refs/heads/main' }}
save_perf_report: ${{ github.ref_name == 'main' }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -1007,7 +1007,7 @@ jobs:
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
echo 2>&1 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
echo >&2 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi

View File

@@ -12,7 +12,7 @@ defaults:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:

View File

@@ -14,7 +14,7 @@ on:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:

23
Cargo.lock generated
View File

@@ -841,6 +841,19 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "compute_api"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"serde",
"serde_json",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "compute_tools"
version = "0.1.0"
@@ -848,6 +861,7 @@ dependencies = [
"anyhow",
"chrono",
"clap 4.1.4",
"compute_api",
"futures",
"hyper",
"notify",
@@ -866,6 +880,7 @@ dependencies = [
"tracing-subscriber",
"tracing-utils",
"url",
"utils",
"workspace_hack",
]
@@ -1558,9 +1573,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "h2"
version = "0.3.15"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4"
checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f"
dependencies = [
"bytes",
"fnv",
@@ -2503,6 +2518,8 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"strum",
"strum_macros",
"utils",
"workspace_hack",
]
@@ -2949,6 +2966,7 @@ dependencies = [
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls",
"tokio-util",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -3354,6 +3372,7 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"toml_edit",
"tracing",

View File

@@ -101,6 +101,7 @@ test-context = "0.1"
thiserror = "1.0"
tls-listener = { version = "0.6", features = ["rustls", "hyper-h1"] }
tokio = { version = "1.17", features = ["macros"] }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.9.0"
tokio-rustls = "0.23"
tokio-stream = "0.1"
@@ -132,6 +133,7 @@ tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df6
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }

View File

@@ -60,6 +60,7 @@ RUN apt update && \
# SFCGAL > 1.3 requires CGAL > 5.2, Bullseye's libcgal-dev is 5.2
RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
echo "4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 SFCGAL.tar.gz" | sha256sum --check && \
mkdir sfcgal-src && cd sfcgal-src && tar xvzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
cmake . && make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
@@ -68,6 +69,7 @@ RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postgis.tar.gz && \
echo "9a2a219da005a1730a39d1959a1c7cec619b1efb009b65be80ffc25bad299068 postgis.tar.gz" | sha256sum --check && \
mkdir postgis-src && cd postgis-src && tar xvzf ../postgis.tar.gz --strip-components=1 -C . && \
./autogen.sh && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
@@ -84,6 +86,7 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postg
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer_data_us.control
RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
echo "cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e pgrouting.tar.gz" | sha256sum --check && \
mkdir pgrouting-src && cd pgrouting-src && tar xvzf ../pgrouting.tar.gz --strip-components=1 -C . && \
mkdir build && \
cd build && \
@@ -104,6 +107,7 @@ RUN apt update && \
apt install -y ninja-build python3-dev libncurses5 binutils clang
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.5.tar.gz -O plv8.tar.gz && \
echo "1e108d5df639e4c189e1c5bdfa2432a521c126ca89e7e5a969d46899ca7bf106 plv8.tar.gz" | sha256sum --check && \
mkdir plv8-src && cd plv8-src && tar xvzf ../plv8.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -125,11 +129,13 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN wget https://github.com/Kitware/CMake/releases/download/v3.24.2/cmake-3.24.2-linux-x86_64.sh \
-q -O /tmp/cmake-install.sh \
&& echo "739d372726cb23129d57a539ce1432453448816e345e1545f6127296926b6754 /tmp/cmake-install.sh" | sha256sum --check \
&& chmod u+x /tmp/cmake-install.sh \
&& /tmp/cmake-install.sh --skip-license --prefix=/usr/local/ \
&& rm /tmp/cmake-install.sh
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz && \
echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \
mkdir h3-src && cd h3-src && tar xvzf ../h3.tar.gz --strip-components=1 -C . && \
mkdir build && cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
@@ -139,6 +145,7 @@ RUN wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz
rm -rf build
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.2.tar.gz -O h3-pg.tar.gz && \
echo "c135aa45999b2ad1326d2537c1cadef96d52660838e4ca371706c08fdea1a956 h3-pg.tar.gz" | sha256sum --check && \
mkdir h3-pg-src && cd h3-pg-src && tar xvzf ../h3-pg.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -156,6 +163,7 @@ FROM build-deps AS unit-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -O postgresql-unit.tar.gz && \
echo "411d05beeb97e5a4abf17572bfcfbb5a68d98d1018918feff995f6ee3bb03e79 postgresql-unit.tar.gz" | sha256sum --check && \
mkdir postgresql-unit-src && cd postgresql-unit-src && tar xvzf ../postgresql-unit.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -176,6 +184,7 @@ FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.0.tar.gz -O pgvector.tar.gz && \
echo "b76cf84ddad452cc880a6c8c661d137ddd8679c000a16332f4f03ecf6e10bcc8 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -192,6 +201,7 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# 9742dab1b2f297ad3811120db7b21451bca2d3c9 made on 13/11/2021
RUN wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b21451bca2d3c9.tar.gz -O pgjwt.tar.gz && \
echo "cfdefb15007286f67d3d45510f04a6a7a495004be5b3aecb12cda667e774203f pgjwt.tar.gz" | sha256sum --check && \
mkdir pgjwt-src && cd pgjwt-src && tar xvzf ../pgjwt.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
@@ -206,6 +216,7 @@ FROM build-deps AS hypopg-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypopg.tar.gz && \
echo "e7f01ee0259dc1713f318a108f987663d60f3041948c2ada57a94b469565ca8e hypopg.tar.gz" | sha256sum --check && \
mkdir hypopg-src && cd hypopg-src && tar xvzf ../hypopg.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -221,6 +232,7 @@ FROM build-deps AS pg-hashids-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
echo "74576b992d9277c92196dd8d816baa2cc2d8046fe102f3dcd7f3c3febed6822a pg_hashids.tar.gz" | sha256sum --check && \
mkdir pg_hashids-src && cd pg_hashids-src && tar xvzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -236,6 +248,7 @@ FROM build-deps AS rum-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/postgrespro/rum/archive/refs/tags/1.3.13.tar.gz -O rum.tar.gz && \
echo "6ab370532c965568df6210bd844ac6ba649f53055e48243525b0b7e5c4d69a7d rum.tar.gz" | sha256sum --check && \
mkdir rum-src && cd rum-src && tar xvzf ../rum.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -251,11 +264,28 @@ FROM build-deps AS pgtap-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/theory/pgtap/archive/refs/tags/v1.2.0.tar.gz -O pgtap.tar.gz && \
echo "9c7c3de67ea41638e14f06da5da57bac6f5bd03fea05c165a0ec862205a5c052 pgtap.tar.gz" | sha256sum --check && \
mkdir pgtap-src && cd pgtap-src && tar xvzf ../pgtap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgtap.control
#########################################################################################
#
# Layer "ip4r-pg-build"
# compile ip4r extension
#
#########################################################################################
FROM build-deps AS ip4r-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.1.tar.gz -O ip4r.tar.gz && \
echo "78b9f0c1ae45c22182768fe892a32d533c82281035e10914111400bf6301c726 ip4r.tar.gz" | sha256sum --check && \
mkdir ip4r-src && cd ip4r-src && tar xvzf ../ip4r.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/ip4r.control
#########################################################################################
#
# Layer "prefix-pg-build"
@@ -266,6 +296,7 @@ FROM build-deps AS prefix-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.9.tar.gz -O prefix.tar.gz && \
echo "38d30a08d0241a8bbb8e1eb8f0152b385051665a8e621c8899e7c5068f8b511e prefix.tar.gz" | sha256sum --check && \
mkdir prefix-src && cd prefix-src && tar xvzf ../prefix.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -281,6 +312,7 @@ FROM build-deps AS hll-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.17.tar.gz -O hll.tar.gz && \
echo "9a18288e884f197196b0d29b9f178ba595b0dfc21fbf7a8699380e77fa04c1e9 hll.tar.gz" | sha256sum --check && \
mkdir hll-src && cd hll-src && tar xvzf ../hll.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -296,6 +328,7 @@ FROM build-deps AS plpgsql-check-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.3.2.tar.gz -O plpgsql_check.tar.gz && \
echo "9d81167c4bbeb74eebf7d60147b21961506161addc2aee537f95ad8efeae427b plpgsql_check.tar.gz" | sha256sum --check && \
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -315,6 +348,7 @@ ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN apt-get update && \
apt-get install -y cmake && \
wget https://github.com/timescale/timescaledb/archive/refs/tags/2.10.1.tar.gz -O timescaledb.tar.gz && \
echo "6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73 timescaledb.tar.gz" | sha256sum --check && \
mkdir timescaledb-src && cd timescaledb-src && tar xvzf ../timescaledb.tar.gz --strip-components=1 -C . && \
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON && \
cd build && \
@@ -323,7 +357,39 @@ RUN apt-get update && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/timescaledb.control
#########################################################################################
#
#
# Layer "pg-hint-plan-pg-build"
# compile pg_hint_plan extension
#
#########################################################################################
FROM build-deps AS pg-hint-plan-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN case "${PG_VERSION}" in \
"v14") \
export PG_HINT_PLAN_VERSION=14_1_4_1 \
export PG_HINT_PLAN_CHECKSUM=c3501becf70ead27f70626bce80ea401ceac6a77e2083ee5f3ff1f1444ec1ad1 \
;; \
"v15") \
export PG_HINT_PLAN_VERSION=15_1_5_0 \
export PG_HINT_PLAN_CHECKSUM=564cbbf4820973ffece63fbf76e3c0af62c4ab23543142c7caaa682bc48918be \
;; \
*) \
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \
;; \
esac && \
wget https://github.com/ossc-db/pg_hint_plan/archive/refs/tags/REL${PG_HINT_PLAN_VERSION}.tar.gz -O pg_hint_plan.tar.gz && \
echo "${PG_HINT_PLAN_CHECKSUM} pg_hint_plan.tar.gz" | sha256sum --check && \
mkdir pg_hint_plan-src && cd pg_hint_plan-src && tar xvzf ../pg_hint_plan.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make install -j $(getconf _NPROCESSORS_ONLN) && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_hint_plan.control
#########################################################################################
#
# Layer "rust extensions"
# This layer is used to build `pgx` deps
#
@@ -351,7 +417,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
USER root
#########################################################################################
#
#
# Layer "pg-jsonschema-pg-build"
# Compile "pg_jsonschema" extension
#
@@ -359,15 +425,17 @@ USER root
FROM rust-extensions-build AS pg-jsonschema-pg-build
# there is no release tag yet, but we need it due to the superuser fix in the control file
# caeab60d70b2fd3ae421ec66466a3abbb37b7ee6 made on 06/03/2023
# there is no release tag yet, but we need it due to the superuser fix in the control file, switch to git tag after release >= 0.1.5
RUN wget https://github.com/supabase/pg_jsonschema/archive/caeab60d70b2fd3ae421ec66466a3abbb37b7ee6.tar.gz -O pg_jsonschema.tar.gz && \
echo "54129ce2e7ee7a585648dbb4cef6d73f795d94fe72f248ac01119992518469a4 pg_jsonschema.tar.gz" | sha256sum --check && \
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xvzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
sed -i 's/pgx = "0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
#########################################################################################
#
#
# Layer "pg-graphql-pg-build"
# Compile "pg_graphql" extension
#
@@ -375,11 +443,13 @@ RUN wget https://github.com/supabase/pg_jsonschema/archive/caeab60d70b2fd3ae421e
FROM rust-extensions-build AS pg-graphql-pg-build
# b4988843647450a153439be367168ed09971af85 made on 22/02/2023 (from remove-pgx-contrib-spiext branch)
# Currently pgx version bump to >= 0.7.2 causes "call to unsafe function" compliation errors in
# pgx-contrib-spiext. There is a branch that removes that dependency, so use it. It is on the
# same 1.1 version we've used before.
RUN git clone -b remove-pgx-contrib-spiext --single-branch https://github.com/yrashk/pg_graphql && \
cd pg_graphql && \
RUN wget https://github.com/yrashk/pg_graphql/archive/b4988843647450a153439be367168ed09971af85.tar.gz -O pg_graphql.tar.gz && \
echo "0c7b0e746441b2ec24187d0e03555faf935c2159e2839bddd14df6dafbc8c9bd pg_graphql.tar.gz" | sha256sum --check && \
mkdir pg_graphql-src && cd pg_graphql-src && tar xvzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
sed -i 's/pgx = "~0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgx-tests = "~0.7.1"/pgx-tests = "0.7.3"/g' Cargo.toml && \
cargo pgx install --release && \
@@ -396,8 +466,10 @@ RUN git clone -b remove-pgx-contrib-spiext --single-branch https://github.com/yr
FROM rust-extensions-build AS pg-tiktoken-pg-build
RUN git clone --depth=1 --single-branch https://github.com/kelvich/pg_tiktoken && \
cd pg_tiktoken && \
# 801f84f08c6881c8aa30f405fafbf00eec386a72 made on 10/03/2023
RUN wget https://github.com/kelvich/pg_tiktoken/archive/801f84f08c6881c8aa30f405fafbf00eec386a72.tar.gz -O pg_tiktoken.tar.gz && \
echo "52f60ac800993a49aa8c609961842b611b6b1949717b69ce2ec9117117e16e4a pg_tiktoken.tar.gz" | sha256sum --check && \
mkdir pg_tiktoken-src && cd pg_tiktoken-src && tar xvzf ../pg_tiktoken.tar.gz --strip-components=1 -C . && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_tiktoken.control
@@ -423,10 +495,12 @@ COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=rum-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgtap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=ip4r-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=prefix-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=hll-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plpgsql-check-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hint-plan-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -147,15 +147,15 @@ Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50
Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one
# start postgres compute node
> ./target/debug/neon_local pg start main
Starting new postgres (v14) main on timeline de200bd42b49cc1814412c7e592dd6e9 ...
> ./target/debug/neon_local endpoint start main
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
Starting postgres at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
# check list of running postgres instances
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
```
2. Now, it is possible to connect to postgres and run some queries:
@@ -184,14 +184,14 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
(L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601]
# start postgres on that branch
> ./target/debug/neon_local pg start migration_check --branch-name migration_check
Starting new postgres migration_check on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
> ./target/debug/neon_local endpoint start migration_check --branch-name migration_check
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Starting postgres node at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
Starting postgres at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
# check the new list of running postgres instances
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16F9A38 running
migration_check 127.0.0.1:55433 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running

View File

@@ -27,4 +27,6 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
url.workspace = true
compute_api.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -34,7 +34,7 @@ use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::exit;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{mpsc, Arc, Condvar, Mutex};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
@@ -43,12 +43,14 @@ use clap::Arg;
use tracing::{error, info};
use url::Url;
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::pg_helpers::*;
use compute_tools::spec::*;
fn main() -> Result<()> {
@@ -71,28 +73,24 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let mut spec = Default::default();
let mut spec_set = false;
let mut spec = None;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
Some(json) => {
spec = serde_json::from_str(json)?;
spec_set = true;
spec = Some(serde_json::from_str(json)?);
}
None => {
// Second, try to read it from the file if path is provided
if let Some(sp) = spec_path {
let path = Path::new(sp);
let file = File::open(path)?;
spec = serde_json::from_reader(file)?;
spec_set = true;
spec = Some(serde_json::from_reader(file)?);
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
live_config_allowed = true;
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = s;
spec_set = true;
spec = Some(s);
}
} else {
panic!("must specify both --control-plane-uri and --compute-id or none");
@@ -107,8 +105,13 @@ fn main() -> Result<()> {
};
let mut new_state = ComputeState::new();
if spec_set {
new_state.spec = spec;
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
spec_set = true;
} else {
spec_set = false;
}
let compute_node = ComputeNode {
start_time: Utc::now(),
@@ -116,7 +119,6 @@ fn main() -> Result<()> {
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
metrics: ComputeMetrics::default(),
state: Mutex::new(new_state),
state_changed: Condvar::new(),
};
@@ -141,33 +143,10 @@ fn main() -> Result<()> {
}
}
// We got all we need, fill in the state.
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
let pageserver_connstr = state
.spec
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let storage_auth_token = state.spec.storage_auth_token.clone();
let tenant = state
.spec
.cluster
.settings
.find("neon.tenant_id")
.expect("tenant id should be provided");
let timeline = state
.spec
.cluster
.settings
.find("neon.timeline_id")
.expect("tenant id should be provided");
let startup_tracing_context = state.spec.startup_tracing_context.clone();
state.pageserver_connstr = pageserver_connstr;
state.storage_auth_token = storage_auth_token;
state.tenant = tenant;
state.timeline = timeline;
let pspec = state.pspec.as_ref().expect("spec must be set");
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
state.status = ComputeStatus::Init;
compute.state_changed.notify_all();
drop(state);
@@ -197,6 +176,8 @@ fn main() -> Result<()> {
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// Start Postgres
let mut delay_exit = false;
@@ -238,10 +219,25 @@ fn main() -> Result<()> {
thread::sleep(Duration::from_secs(30));
}
info!("shutting down tracing");
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
tracing_utils::shutdown_tracing();
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
tracing_utils::shutdown_tracing();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
info!("shutting down");
exit(exit_code.unwrap_or(1))

View File

@@ -19,15 +19,18 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use serde::Serialize;
use tokio_postgres;
use tracing::{info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
use crate::checker::create_writability_check_data;
use crate::config;
@@ -41,7 +44,6 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub metrics: ComputeMetrics,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
@@ -69,11 +71,8 @@ pub struct ComputeState {
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
pub error: Option<String>,
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
pub pspec: Option<ParsedSpec>,
pub metrics: ComputeMetrics,
}
impl ComputeState {
@@ -82,11 +81,8 @@ impl ComputeState {
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
spec: ComputeSpec::default(),
tenant: String::new(),
timeline: String::new(),
pageserver_connstr: String::new(),
storage_auth_token: None,
pspec: None,
metrics: ComputeMetrics::default(),
}
}
}
@@ -97,31 +93,47 @@ impl Default for ComputeState {
}
}
#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided at start, waiting for it to be
// provided by control-plane.
Empty,
// Compute configuration was requested.
ConfigurationPending,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute is configured and running.
Running,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
#[derive(Clone, Debug)]
pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
}
#[derive(Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: AtomicU64,
pub basebackup_ms: AtomicU64,
pub config_ms: AtomicU64,
pub total_startup_ms: AtomicU64,
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.ok_or("pageserver connstr should be provided")?;
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = spec
.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?;
let timeline_id: TimelineId = spec
.cluster
.settings
.find("neon.timeline_id")
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?;
Ok(ParsedSpec {
spec,
pageserver_connstr,
storage_auth_token,
tenant_id,
timeline_id,
})
}
}
impl ComputeNode {
@@ -149,14 +161,15 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> {
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?;
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
if let Some(storage_auth_token) = &compute_state.storage_auth_token {
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token);
} else {
@@ -165,14 +178,8 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let basebackup_cmd = match lsn {
"0/0" => format!(
"basebackup {} {}",
&compute_state.tenant, &compute_state.timeline
), // First start of the compute
_ => format!(
"basebackup {} {} {}",
&compute_state.tenant, &compute_state.timeline, lsn
),
Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id), // First start of the compute
_ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
@@ -185,22 +192,18 @@ impl ComputeNode {
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata)?;
self.metrics.basebackup_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
Ok(())
}
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self, storage_auth_token))]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<String> {
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
let sync_handle = Command::new(&self.pgbin)
@@ -231,16 +234,13 @@ impl ComputeNode {
);
}
self.metrics.sync_safekeepers_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim());
let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
Ok(lsn)
}
@@ -249,33 +249,43 @@ impl ComputeNode {
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let spec = &compute_state.spec;
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(compute_state.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
let start_lsn = if let Some(lsn) = pspec.spec.start_lsn {
lsn
} else {
info!("starting safekeepers syncing");
let last_lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", last_lsn);
last_lsn
};
info!(
"getting basebackup@{} from pageserver {}",
lsn, &compute_state.pageserver_connstr
start_lsn, &pspec.pageserver_connstr
);
self.get_basebackup(compute_state, &lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &compute_state.pageserver_connstr
)
})?;
self.get_basebackup(compute_state, start_lsn)
.with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
start_lsn, &pspec.pageserver_connstr
)
})?;
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
if pspec.spec.standby {
std::fs::File::create(pgdata_path.join("standby.signal"))?;
}
Ok(())
}
@@ -337,19 +347,62 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&compute_state.spec, &mut client)?;
handle_databases(&compute_state.spec, &mut client)?;
handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?;
handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?;
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(&compute_state.spec, &mut client)?;
handle_extensions(spec, &mut client)?;
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
compute_state.spec.cluster.cluster_id
spec.cluster.cluster_id
);
Ok(())
}
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip(self, client))]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
/// Similar to `apply_config()`, but does a bit different sequence of operations,
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip(self))]
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
// 'Close' connection
drop(client);
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
"finished reconfiguration of compute node for operation {}",
op_id
);
Ok(())
@@ -358,40 +411,39 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let spec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
compute_state.spec.cluster.cluster_id,
compute_state.spec.operation_uuid.as_ref().unwrap(),
compute_state.tenant,
compute_state.timeline,
spec.spec.cluster.cluster_id,
spec.spec.operation_uuid.as_deref().unwrap_or("None"),
spec.tenant_id,
spec.timeline_id,
);
self.prepare_pgdata(&compute_state)?;
let start_time = Utc::now();
let pg = self.start_postgres(compute_state.storage_auth_token.clone())?;
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
self.apply_config(&compute_state)?;
if !spec.spec.standby {
self.apply_config(&compute_state)?;
}
let startup_end_time = Utc::now();
self.metrics.config_ms.store(
startup_end_time
{
let mut state = self.state.lock().unwrap();
state.metrics.config_ms = startup_end_time
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.metrics.total_startup_ms.store(
startup_end_time
.as_millis() as u64;
state.metrics.total_startup_ms = startup_end_time
.signed_duration_since(self.start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
.as_millis() as u64;
}
self.set_status(ComputeStatus::Running);
Ok(pg)

View File

@@ -6,7 +6,7 @@ use std::path::Path;
use anyhow::Result;
use crate::pg_helpers::PgOptionsSerialize;
use crate::spec::ComputeSpec;
use compute_api::spec::ComputeSpec;
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.

View File

@@ -0,0 +1,54 @@
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use tracing::{error, info, instrument};
use compute_api::responses::ComputeStatus;
use crate::compute::ComputeNode;
#[instrument(skip(compute))]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let state = compute.state.lock().unwrap();
let mut state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
state.status = ComputeStatus::Configuration;
compute.state_changed.notify_all();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("compute node configured");
}
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");
break;
} else {
info!("woken up for compute status: {:?}, sleeping", state.status);
}
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
}

View File

@@ -3,9 +3,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::{ComputeNode, ComputeStatus};
use crate::http::requests::ConfigurationRequest;
use crate::http::responses::{ComputeStatusResponse, GenericAPIError};
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
@@ -16,6 +16,22 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
tenant: state
.pspec
.as_ref()
.map(|pspec| pspec.tenant_id.to_string()),
timeline: state
.pspec
.as_ref()
.map(|pspec| pspec.timeline_id.to_string()),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),
}
}
// Service function to handle all available routes.
async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
//
@@ -28,8 +44,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
(&Method::GET, "/status") => {
info!("serving /status GET request");
let state = compute.state.lock().unwrap();
let status_response = ComputeStatusResponse::from(state.clone());
let status_response = status_response_from_state(&state);
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
}
@@ -37,7 +52,8 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// future use for Prometheus metrics format.
(&Method::GET, "/metrics.json") => {
info!("serving /metrics.json GET request");
Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap()))
let metrics = compute.state.lock().unwrap().metrics.clone();
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
}
// Collect Postgres current usage insights
@@ -125,6 +141,12 @@ async fn handle_configure_request(
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
let spec = request.spec;
let parsed_spec = match ParsedSpec::try_from(spec) {
Ok(ps) => ps,
Err(msg) => return Err((msg, StatusCode::PRECONDITION_FAILED)),
};
// XXX: wrap state update under lock in code blocks. Otherwise,
// we will try to `Send` `mut state` into the spawned thread
// bellow, which will cause error:
@@ -133,14 +155,14 @@ async fn handle_configure_request(
// ```
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty {
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for configuration request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.spec = spec;
state.pspec = Some(parsed_spec);
state.status = ComputeStatus::ConfigurationPending;
compute.state_changed.notify_all();
drop(state);
@@ -162,7 +184,7 @@ async fn handle_configure_request(
);
if state.status == ComputeStatus::Failed {
let err = state.error.clone().unwrap_or("unknown error".to_string());
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
@@ -175,7 +197,7 @@ async fn handle_configure_request(
// Return current compute state if everything went well.
let state = compute.state.lock().unwrap().clone();
let status_response = ComputeStatusResponse::from(state);
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
} else {
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))

View File

@@ -1,3 +1 @@
pub mod api;
pub mod requests;
pub mod responses;

View File

@@ -1,40 +0,0 @@
use serde::{Serialize, Serializer};
use chrono::{DateTime, Utc};
use crate::compute::{ComputeState, ComputeStatus};
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct ComputeStatusResponse {
pub tenant: String,
pub timeline: String,
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
impl From<ComputeState> for ComputeStatusResponse {
fn from(state: ComputeState) -> Self {
ComputeStatusResponse {
tenant: state.tenant,
timeline: state.timeline,
status: state.status,
last_active: state.last_active,
error: state.error,
}
}
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}

View File

@@ -4,6 +4,7 @@
//!
pub mod checker;
pub mod config;
pub mod configurator;
pub mod http;
#[macro_use]
pub mod logger;

View File

@@ -10,43 +10,12 @@ use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use notify::{RecursiveMode, Watcher};
use postgres::{Client, Transaction};
use serde::Deserialize;
use tracing::{debug, instrument};
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize, Debug)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub options: GenericOptions,
}
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize, Debug)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
pub options: GenericOptions,
}
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Deserialize, Debug)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,
pub vartype: String,
}
/// Optional collection of `GenericOption`'s. Type alias allows us to
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;
/// Escape a string for including it in a SQL literal
fn escape_literal(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
@@ -58,9 +27,14 @@ fn escape_conf_value(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
}
impl GenericOption {
trait GenericOptionExt {
fn to_pg_option(&self) -> String;
fn to_pg_setting(&self) -> String;
}
impl GenericOptionExt for GenericOption {
/// Represent `GenericOption` as SQL statement parameter.
pub fn to_pg_option(&self) -> String {
fn to_pg_option(&self) -> String {
if let Some(val) = &self.value {
match self.vartype.as_ref() {
"string" => format!("{} '{}'", self.name, escape_literal(val)),
@@ -72,7 +46,7 @@ impl GenericOption {
}
/// Represent `GenericOption` as configuration option.
pub fn to_pg_setting(&self) -> String {
fn to_pg_setting(&self) -> String {
if let Some(val) = &self.value {
match self.vartype.as_ref() {
"string" => format!("{} = '{}'", self.name, escape_conf_value(val)),
@@ -131,10 +105,14 @@ impl GenericOptionsSearch for GenericOptions {
}
}
impl Role {
pub trait RoleExt {
fn to_pg_options(&self) -> String;
}
impl RoleExt for Role {
/// Serialize a list of role parameters into a Postgres-acceptable
/// string of arguments.
pub fn to_pg_options(&self) -> String {
fn to_pg_options(&self) -> String {
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
// For now, we do not use generic `options` for roles. Once used, add
// `self.options.as_pg_options()` somewhere here.
@@ -159,21 +137,17 @@ impl Role {
}
}
impl Database {
pub fn new(name: PgIdent, owner: PgIdent) -> Self {
Self {
name,
owner,
options: None,
}
}
pub trait DatabaseExt {
fn to_pg_options(&self) -> String;
}
impl DatabaseExt for Database {
/// Serialize a list of database parameters into a Postgres-acceptable
/// string of arguments.
/// NB: `TEMPLATE` is actually also an identifier, but so far we only need
/// to use `template0` and `template1`, so it is not a problem. Yet in the future
/// it may require a proper quoting too.
pub fn to_pg_options(&self) -> String {
fn to_pg_options(&self) -> String {
let mut params: String = self.options.as_pg_options();
write!(params, " OWNER {}", &self.owner.pg_quote())
.expect("String is documented to not to error during write operations");
@@ -182,10 +156,6 @@ impl Database {
}
}
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
pub type PgIdent = String;
/// Generic trait used to provide quoting / encoding for strings used in the
/// Postgres SQL queries and DATABASE_URL.
pub trait Escaping {
@@ -226,7 +196,11 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
&[],
)?
.iter()
.map(|row| Database::new(row.get("datname"), row.get("owner")))
.map(|row| Database {
name: row.get("datname"),
owner: row.get("owner"),
options: None,
})
.collect();
Ok(postgres_dbs)

View File

@@ -1,57 +1,17 @@
use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use anyhow::Result;
use anyhow::{anyhow, bail, Result};
use postgres::config::Config;
use postgres::{Client, NoTls};
use serde::Deserialize;
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Deserialize, Debug, Default)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
pub operation_uuid: Option<String>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
pub storage_auth_token: Option<String>,
pub startup_tracing_context: Option<HashMap<String, String>>,
}
/// Cluster state seen from the perspective of the external tools
/// like Rails web console.
#[derive(Clone, Deserialize, Debug, Default)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
pub state: Option<String>,
pub roles: Vec<Role>,
pub databases: Vec<Database>,
pub settings: GenericOptions,
}
/// Single cluster state changing operation that could not be represented as
/// a static `Cluster` structure. For example:
/// - DROP DATABASE
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Deserialize, Debug)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
pub new_name: Option<PgIdent>,
}
use compute_api::responses::ControlPlaneSpecResponse;
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
@@ -67,13 +27,19 @@ pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<C
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let spec = reqwest::blocking::Client::new()
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?;
.send()
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
.json()
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
Ok(spec)
if let Some(spec) = resp.spec {
Ok(spec)
} else {
bail!("could not get compute spec from control plane")
}
}
/// It takes cluster specification and does the following:

View File

@@ -1,14 +1,13 @@
#[cfg(test)]
mod pg_helpers_tests {
use std::fs::File;
use compute_api::spec::{ComputeSpec, GenericOption, GenericOptions, PgIdent};
use compute_tools::pg_helpers::*;
use compute_tools::spec::ComputeSpec;
#[test]
fn params_serialize() {
let file = File::open("tests/cluster_spec.json").unwrap();
let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
assert_eq!(
@@ -23,7 +22,7 @@ mod pg_helpers_tests {
#[test]
fn settings_serialize() {
let file = File::open("tests/cluster_spec.json").unwrap();
let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
assert_eq!(

View File

@@ -7,7 +7,7 @@
//!
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::compute::ComputeControlPlane;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -106,8 +106,9 @@ fn main() -> Result<()> {
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"pg" => handle_pg(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env),
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
_ => bail!("unexpected subcommand {sub_name}"),
};
@@ -470,10 +471,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -521,10 +522,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
Ok(())
}
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match pg_match.subcommand() {
Some(pg_subcommand_data) => pg_subcommand_data,
None => bail!("no pg subcommand provided"),
fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match ep_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no endpoint subcommand provided"),
};
let mut cplane = ComputeControlPlane::load(env.clone())?;
@@ -546,7 +547,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
table.load_preset(comfy_table::presets::NOTHING);
table.set_header([
"NODE",
"ENDPOINT",
"ADDRESS",
"TIMELINE",
"BRANCH NAME",
@@ -554,39 +555,39 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
"STATUS",
]);
for ((_, node_name), node) in cplane
.nodes
for (endpoint_id, endpoint) in cplane
.endpoints
.iter()
.filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id)
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
let lsn_str = match node.lsn {
let lsn_str = match endpoint.lsn {
None => {
// -> primary node
// -> primary endpoint
// Use the LSN at the end of the timeline.
timeline_infos
.get(&node.timeline_id)
.get(&endpoint.timeline_id)
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {
// -> read-only node
// Use the node's LSN.
// -> read-only endpoint
// Use the endpoint's LSN.
lsn.to_string()
}
};
let branch_name = timeline_name_mappings
.get(&TenantTimelineId::new(tenant_id, node.timeline_id))
.get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id))
.map(|name| name.as_str())
.unwrap_or("?");
table.add_row([
node_name.as_str(),
&node.address.to_string(),
&node.timeline_id.to_string(),
endpoint_id.as_str(),
&endpoint.address.to_string(),
&endpoint.timeline_id.to_string(),
branch_name,
lsn_str.as_str(),
node.status(),
endpoint.status(),
]);
}
@@ -597,10 +598,10 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.get_one::<String>("branch-name")
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let node_name = sub_args
.get_one::<String>("node")
.map(|node_name| node_name.to_string())
.unwrap_or_else(|| format!("{branch_name}_node"));
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.map(String::to_string)
.unwrap_or_else(|| format!("ep-{branch_name}"));
let lsn = sub_args
.get_one::<String>("lsn")
@@ -618,15 +619,15 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.copied()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
let node_name = sub_args
.get_one::<String>("node")
.ok_or_else(|| anyhow!("No node name was provided to start"))?;
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let node = cplane.nodes.get(&(tenant_id, node_name.to_string()));
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
@@ -636,9 +637,9 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
None
};
if let Some(node) = node {
println!("Starting existing postgres {node_name}...");
node.start(&auth_token)?;
if let Some(endpoint) = endpoint {
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
} else {
let branch_name = sub_args
.get_one::<String>("branch-name")
@@ -663,27 +664,33 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// start --port X
// stop
// start <-- will also use port X even without explicit port argument
println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ...");
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");
let node =
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
node.start(&auth_token)?;
let ep = cplane.new_endpoint(
tenant_id,
endpoint_id,
timeline_id,
lsn,
port,
pg_version,
)?;
ep.start(&auth_token)?;
}
}
"stop" => {
let node_name = sub_args
.get_one::<String>("node")
.ok_or_else(|| anyhow!("No node name was provided to stop"))?;
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
let destroy = sub_args.get_flag("destroy");
let node = cplane
.nodes
.get(&(tenant_id, node_name.to_string()))
.with_context(|| format!("postgres {node_name} is not found"))?;
node.stop(destroy)?;
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(destroy)?;
}
_ => bail!("Unexpected pg subcommand '{sub_name}'"),
_ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
}
Ok(())
@@ -802,7 +809,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
// Postgres nodes are not started automatically
// Endpoints are not started automatically
broker::start_broker_process(env)?;
@@ -836,10 +843,10 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let pageserver = PageServerNode::from_env(env);
// Stop all compute nodes
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.nodes {
for (_k, node) in cplane.endpoints {
if let Err(e) = node.stop(false) {
eprintln!("postgres stop failed: {e:#}");
}
@@ -872,7 +879,9 @@ fn cli() -> Command {
.help("Name of the branch to be created or used as an alias for other services")
.required(false);
let pg_node_arg = Arg::new("node").help("Postgres node name").required(false);
let endpoint_id_arg = Arg::new("endpoint_id")
.help("Postgres endpoint id")
.required(false);
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
@@ -1026,27 +1035,27 @@ fn cli() -> Command {
)
)
.subcommand(
Command::new("pg")
Command::new("endpoint")
.arg_required_else_help(true)
.about("Manage postgres instances")
.subcommand(Command::new("list").arg(tenant_id_arg.clone()))
.subcommand(Command::new("create")
.about("Create a postgres compute node")
.arg(pg_node_arg.clone())
.about("Create a compute endpoint")
.arg(endpoint_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone())
.arg(
Arg::new("config-only")
.help("Don't do basebackup, create compute node with only config files")
.help("Don't do basebackup, create endpoint directory with only config files")
.long("config-only")
.required(false))
.arg(pg_version_arg.clone())
)
.subcommand(Command::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone())
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone())
.arg(branch_name_arg)
.arg(timeline_id_arg)
@@ -1056,7 +1065,7 @@ fn cli() -> Command {
)
.subcommand(
Command::new("stop")
.arg(pg_node_arg)
.arg(endpoint_id_arg)
.arg(tenant_id_arg)
.arg(
Arg::new("destroy")
@@ -1068,6 +1077,13 @@ fn cli() -> Command {
)
)
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
.subcommand(
Command::new("pg")
.hide(true)
.arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false))
.trailing_var_arg(true)
)
.subcommand(
Command::new("start")
.about("Start page server and safekeepers")

View File

@@ -25,54 +25,45 @@ use crate::postgresql_conf::PostgresConf;
//
pub struct ComputeControlPlane {
base_port: u16,
pageserver: Arc<PageServerNode>,
pub nodes: BTreeMap<(TenantId, String), Arc<PostgresNode>>,
// endpoint ID is the key
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
env: LocalEnv,
pageserver: Arc<PageServerNode>,
}
impl ComputeControlPlane {
// Load current nodes with ports from data directories on disk
// Directory structure has the following layout:
// pgdatadirs
// |- tenants
// | |- <tenant_id>
// | | |- <node name>
// Load current endpoints from the endpoints/ subdirectories
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
let pageserver = Arc::new(PageServerNode::from_env(&env));
let mut nodes = BTreeMap::default();
let pgdatadirspath = &env.pg_data_dirs_path();
for tenant_dir in fs::read_dir(pgdatadirspath)
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
let mut endpoints = BTreeMap::default();
for endpoint_dir in fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let tenant_dir = tenant_dir?;
for timeline_dir in fs::read_dir(tenant_dir.path())
.with_context(|| format!("failed to list {}", tenant_dir.path().display()))?
{
let node = PostgresNode::from_dir_entry(timeline_dir?, &env, &pageserver)?;
nodes.insert((node.tenant_id, node.name.clone()), Arc::new(node));
}
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
endpoints.insert(ep.name.clone(), Arc::new(ep));
}
Ok(ComputeControlPlane {
base_port: 55431,
pageserver,
nodes,
endpoints,
env,
pageserver,
})
}
fn get_port(&mut self) -> u16 {
1 + self
.nodes
.endpoints
.values()
.map(|node| node.address.port())
.map(|ep| ep.address.port())
.max()
.unwrap_or(self.base_port)
}
pub fn new_node(
pub fn new_endpoint(
&mut self,
tenant_id: TenantId,
name: &str,
@@ -80,9 +71,9 @@ impl ComputeControlPlane {
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<PostgresNode>> {
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
let node = Arc::new(PostgresNode {
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(),
@@ -93,39 +84,45 @@ impl ComputeControlPlane {
pg_version,
});
node.create_pgdata()?;
node.setup_pg_conf()?;
ep.create_pgdata()?;
ep.setup_pg_conf()?;
self.nodes
.insert((tenant_id, node.name.clone()), Arc::clone(&node));
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
Ok(node)
Ok(ep)
}
}
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct PostgresNode {
pub address: SocketAddr,
pub struct Endpoint {
/// used as the directory name
name: String,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub lsn: Option<Lsn>,
// port and address of the Postgres server
pub address: SocketAddr,
pg_version: u32,
// These are not part of the endpoint as such, but the environment
// the endpoint runs in.
pub env: LocalEnv,
pageserver: Arc<PageServerNode>,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub tenant_id: TenantId,
pg_version: u32,
}
impl PostgresNode {
impl Endpoint {
fn from_dir_entry(
entry: std::fs::DirEntry,
env: &LocalEnv,
pageserver: &Arc<PageServerNode>,
) -> Result<PostgresNode> {
) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() {
anyhow::bail!(
"PostgresNode::from_dir_entry failed: '{}' is not a directory",
"Endpoint::from_dir_entry failed: '{}' is not a directory",
entry.path().display()
);
}
@@ -135,7 +132,7 @@ impl PostgresNode {
let name = fname.to_str().unwrap().to_string();
// Read config file into memory
let cfg_path = entry.path().join("postgresql.conf");
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
let cfg_path_str = cfg_path.to_string_lossy();
let mut conf_file = File::open(&cfg_path)
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
@@ -161,7 +158,7 @@ impl PostgresNode {
conf.parse_field_optional("recovery_target_lsn", &context)?;
// ok now
Ok(PostgresNode {
Ok(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
name,
env: env.clone(),
@@ -269,7 +266,7 @@ impl PostgresNode {
}
// Write postgresql.conf with default configuration
// and PG_VERSION file to the data directory of a new node.
// and PG_VERSION file to the data directory of a new endpoint.
fn setup_pg_conf(&self) -> Result<()> {
let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10");
@@ -289,7 +286,7 @@ impl PostgresNode {
// walproposer panics when basebackup is invalid, it is pointless to restart in this case.
conf.append("restart_after_crash", "off");
// Configure the node to fetch pages from pageserver
// Configure the Neon Postgres extension to fetch pages from pageserver
let pageserver_connstr = {
let config = &self.pageserver.pg_connection_config;
let (host, port) = (config.host(), config.port());
@@ -325,7 +322,7 @@ impl PostgresNode {
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() {
// Configure the node to connect to the safekeepers
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self
@@ -380,8 +377,12 @@ impl PostgresNode {
Ok(())
}
pub fn endpoint_path(&self) -> PathBuf {
self.env.endpoints_path().join(&self.name)
}
pub fn pgdata(&self) -> PathBuf {
self.env.pg_data_dir(&self.tenant_id, &self.name)
self.endpoint_path().join("pgdata")
}
pub fn status(&self) -> &str {
@@ -443,12 +444,11 @@ impl PostgresNode {
}
pub fn start(&self, auth_token: &Option<String>) -> Result<()> {
// Bail if the node already running.
if self.status() == "running" {
anyhow::bail!("The node is already running");
anyhow::bail!("The endpoint is already running");
}
// 1. We always start compute node from scratch, so
// 1. We always start Postgres from scratch, so
// if old dir exists, preserve 'postgresql.conf' and drop the directory
let postgresql_conf_path = self.pgdata().join("postgresql.conf");
let postgresql_conf = fs::read(&postgresql_conf_path).with_context(|| {
@@ -470,8 +470,8 @@ impl PostgresNode {
File::create(self.pgdata().join("standby.signal"))?;
}
// 4. Finally start the compute node postgres
println!("Starting postgres node at '{}'", self.connstr());
// 4. Finally start postgres
println!("Starting postgres at '{}'", self.connstr());
self.pg_ctl(&["start"], auth_token)
}
@@ -480,7 +480,7 @@ impl PostgresNode {
// use immediate shutdown mode, otherwise,
// shutdown gracefully to leave the data directory sane.
//
// Compute node always starts from scratch, so stop
// Postgres is always started from scratch, so stop
// without destroy only used for testing and debugging.
//
if destroy {
@@ -489,7 +489,7 @@ impl PostgresNode {
"Destroying postgres data directory '{}'",
self.pgdata().to_str().unwrap()
);
fs::remove_dir_all(self.pgdata())?;
fs::remove_dir_all(self.endpoint_path())?;
} else {
self.pg_ctl(&["stop"], &None)?;
}

View File

@@ -9,7 +9,7 @@
mod background_process;
pub mod broker;
pub mod compute;
pub mod endpoint;
pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;

View File

@@ -200,14 +200,8 @@ impl LocalEnv {
self.neon_distrib_dir.join("storage_broker")
}
pub fn pg_data_dirs_path(&self) -> PathBuf {
self.base_data_dir.join("pgdatadirs").join("tenants")
}
pub fn pg_data_dir(&self, tenant_id: &TenantId, branch_name: &str) -> PathBuf {
self.pg_data_dirs_path()
.join(tenant_id.to_string())
.join(branch_name)
pub fn endpoints_path(&self) -> PathBuf {
self.base_data_dir.join("endpoints")
}
// TODO: move pageserver files into ./pageserver
@@ -427,7 +421,7 @@ impl LocalEnv {
}
}
fs::create_dir_all(self.pg_data_dirs_path())?;
fs::create_dir_all(self.endpoints_path())?;
for safekeeper in &self.safekeepers {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;

View File

@@ -0,0 +1,15 @@
[package]
name = "compute_api"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
chrono.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
utils = { path = "../utils" }
workspace_hack.workspace = true

View File

@@ -0,0 +1,3 @@
pub mod requests;
pub mod responses;
pub mod spec;

View File

@@ -1,7 +1,10 @@
use serde::Deserialize;
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use crate::spec::ComputeSpec;
use serde::Deserialize;
/// Request of the /configure API
///
/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.

View File

@@ -0,0 +1,78 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize, Serializer};
use crate::spec::ComputeSpec;
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}
/// Response of the /status API
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct ComputeStatusResponse {
pub tenant: Option<String>,
pub timeline: Option<String>,
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided at start, waiting for it to be
// provided by control-plane.
Empty,
// Compute configuration was requested.
ConfigurationPending,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute is configured and running.
Running,
// New spec is being applied.
Configuration,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}
/// Response of the /metrics.json API
#[derive(Clone, Debug, Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: u64,
pub basebackup_ms: u64,
pub config_ms: u64,
pub total_startup_ms: u64,
}
/// Response of the `/computes/{compute_id}/spec` control-plane API.
/// This is not actually a compute API response, so consider moving
/// to a different place.
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {
pub spec: Option<ComputeSpec>,
}

View File

@@ -0,0 +1,111 @@
//! `ComputeSpec` represents the contents of the spec.json file.
//!
//! The spec.json file is used to pass information to 'compute_ctl'. It contains
//! all the information needed to start up the right version of PostgreSQL,
//! and connect it to the storage nodes.
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use utils::lsn::Lsn;
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[serde_as]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
// The control plane also includes a 'timestamp' field in the JSON document,
// but we don't use it for anything. Serde will ignore missing fields when
// deserializing it.
pub operation_uuid: Option<String>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
// If 'lsn' is set, start the compute at the given LSN.
// If 'lsn' is not set, compute_ctl performs "sync safekeepers" step first,
// and uses the last LSN on the timeline as the starting point.
#[serde_as(as = "Option<DisplayFromStr>")]
pub start_lsn: Option<Lsn>,
// If standy == true, a standby.signal file is created, putting Postgres
// into standby mode.
#[serde(default)]
pub standby: bool,
pub storage_auth_token: Option<String>,
pub startup_tracing_context: Option<HashMap<String, String>>,
}
#[derive(Clone, Debug, Default, Deserialize)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
pub state: Option<String>,
pub roles: Vec<Role>,
pub databases: Vec<Database>,
pub settings: GenericOptions,
}
/// Single cluster state changing operation that could not be represented as
/// a static `Cluster` structure. For example:
/// - DROP DATABASE
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Debug, Deserialize)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
pub new_name: Option<PgIdent>,
}
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub options: GenericOptions,
}
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
pub options: GenericOptions,
}
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Debug, Deserialize)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,
pub vartype: String,
}
/// Optional collection of `GenericOption`'s. Type alias allows us to
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
#[test]
fn parse_spec_file() {
let file = File::open("tests/cluster_spec.json").unwrap();
let _spec: ComputeSpec = serde_json::from_reader(file).unwrap();
}
}

View File

@@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
const_format.workspace = true
anyhow.workspace = true
bytes.workspace = true
@@ -14,6 +15,7 @@ byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
enum-map.workspace = true
serde_json.workspace = true
strum.workspace = true
strum_macros.workspace = true
workspace_hack.workspace = true

View File

@@ -7,6 +7,7 @@ use std::{
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
@@ -18,11 +19,23 @@ use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(
Clone,
PartialEq,
Eq,
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumString,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
// This tenant is being loaded from local disk
/// This tenant is being loaded from local disk
Loading,
// This tenant is being downloaded from cloud storage.
/// This tenant is being downloaded from cloud storage.
Attaching,
/// Tenant is fully operational
Active,
@@ -31,15 +44,7 @@ pub enum TenantState {
Stopping,
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
Broken,
}
pub mod state {
pub const LOADING: &str = "loading";
pub const ATTACHING: &str = "attaching";
pub const ACTIVE: &str = "active";
pub const STOPPING: &str = "stopping";
pub const BROKEN: &str = "broken";
Broken { reason: String, backtrace: String },
}
impl TenantState {
@@ -49,17 +54,26 @@ impl TenantState {
Self::Attaching => true,
Self::Active => false,
Self::Stopping => false,
Self::Broken => false,
Self::Broken { .. } => false,
}
}
pub fn as_str(&self) -> &'static str {
pub fn broken_from_reason(reason: String) -> Self {
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
Self::Broken {
reason,
backtrace: backtrace_str,
}
}
}
impl std::fmt::Debug for TenantState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TenantState::Loading => state::LOADING,
TenantState::Attaching => state::ATTACHING,
TenantState::Active => state::ACTIVE,
TenantState::Stopping => state::STOPPING,
TenantState::Broken => state::BROKEN,
Self::Broken { reason, backtrace } if !reason.is_empty() => {
write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
}
_ => write!(f, "{self}"),
}
}
}
@@ -615,6 +629,7 @@ impl PagestreamBeMessage {
#[cfg(test)]
mod tests {
use bytes::Buf;
use serde_json::json;
use super::*;
@@ -665,4 +680,57 @@ mod tests {
assert!(msg == reconstructed);
}
}
#[test]
fn test_tenantinfo_serde() {
// Test serialization/deserialization of TenantInfo
let original_active = TenantInfo {
id: TenantId::generate(),
state: TenantState::Active,
current_physical_size: Some(42),
has_in_progress_downloads: Some(false),
};
let expected_active = json!({
"id": original_active.id.to_string(),
"state": {
"slug": "Active",
},
"current_physical_size": 42,
"has_in_progress_downloads": false,
});
let original_broken = TenantInfo {
id: TenantId::generate(),
state: TenantState::Broken {
reason: "reason".into(),
backtrace: "backtrace info".into(),
},
current_physical_size: Some(42),
has_in_progress_downloads: Some(false),
};
let expected_broken = json!({
"id": original_broken.id.to_string(),
"state": {
"slug": "Broken",
"data": {
"backtrace": "backtrace info",
"reason": "reason",
}
},
"current_physical_size": 42,
"has_in_progress_downloads": false,
});
assert_eq!(
serde_json::to_value(&original_active).unwrap(),
expected_active
);
assert_eq!(
serde_json::to_value(&original_broken).unwrap(),
expected_broken
);
assert!(format!("{:?}", &original_broken.state).contains("reason"));
assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
}
}

View File

@@ -54,7 +54,7 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(
e.kind(),
ConnectionRefused | ConnectionAborted | ConnectionReset
ConnectionRefused | ConnectionAborted | ConnectionReset | TimedOut
)
}
@@ -320,9 +320,17 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
if let ProtoState::Closed = self.state {
Ok(None)
} else {
let m = self.framed.read_message().await?;
trace!("read msg {:?}", m);
Ok(m)
match self.framed.read_message().await {
Ok(m) => {
trace!("read msg {:?}", m);
Ok(m)
}
Err(e) => {
// remember not to try to read anymore
self.state = ProtoState::Closed;
Err(e)
}
}
}
}
@@ -493,7 +501,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
MaybeWriteOnly::Full(framed) => {
let (reader, writer) = framed.split();
self.framed = MaybeWriteOnly::WriteOnly(writer);
Ok(PostgresBackendReader(reader))
Ok(PostgresBackendReader {
reader,
closed: false,
})
}
MaybeWriteOnly::WriteOnly(_) => {
anyhow::bail!("PostgresBackend is already split")
@@ -510,8 +521,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
anyhow::bail!("PostgresBackend is not split")
}
MaybeWriteOnly::WriteOnly(writer) => {
let joined = Framed::unsplit(reader.0, writer);
let joined = Framed::unsplit(reader.reader, writer);
self.framed = MaybeWriteOnly::Full(joined);
// if reader encountered connection error, do not attempt reading anymore
if reader.closed {
self.state = ProtoState::Closed;
}
Ok(())
}
MaybeWriteOnly::Broken => panic!("unsplit on framed in invalid state"),
@@ -797,15 +812,25 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
}
}
pub struct PostgresBackendReader<IO>(FramedReader<MaybeTlsStream<IO>>);
pub struct PostgresBackendReader<IO> {
reader: FramedReader<MaybeTlsStream<IO>>,
closed: bool, // true if received error closing the connection
}
impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackendReader<IO> {
/// Read full message or return None if connection is cleanly closed with no
/// unprocessed data.
pub async fn read_message(&mut self) -> Result<Option<FeMessage>, ConnectionError> {
let m = self.0.read_message().await?;
trace!("read msg {:?}", m);
Ok(m)
match self.reader.read_message().await {
Ok(m) => {
trace!("read msg {:?}", m);
Ok(m)
}
Err(e) => {
self.closed = true;
Err(e)
}
}
}
/// Get CopyData contents of the next message in COPY stream or error
@@ -923,7 +948,7 @@ pub enum CopyStreamHandlerEnd {
#[error("EOF on COPY stream")]
EOF,
/// The connection was lost
#[error(transparent)]
#[error("connection error: {0}")]
Disconnected(#[from] ConnectionError),
/// Some other error
#[error(transparent)]

View File

@@ -13,7 +13,6 @@ use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
ops::Deref,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
@@ -90,7 +89,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
&self,
data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
@@ -161,14 +160,67 @@ pub enum GenericRemoteStorage {
Unreliable(Arc<UnreliableWrapper>),
}
impl Deref for GenericRemoteStorage {
type Target = dyn RemoteStorage;
fn deref(&self) -> &Self::Target {
impl GenericRemoteStorage {
pub async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
match self {
GenericRemoteStorage::LocalFs(local_fs) => local_fs,
GenericRemoteStorage::AwsS3(s3_bucket) => s3_bucket.as_ref(),
GenericRemoteStorage::Unreliable(s) => s.as_ref(),
Self::LocalFs(s) => s.list_prefixes(prefix).await,
Self::AwsS3(s) => s.list_prefixes(prefix).await,
Self::Unreliable(s) => s.list_prefixes(prefix).await,
}
}
pub async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
}
}
pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => s.download(from).await,
Self::AwsS3(s) => s.download(from).await,
Self::Unreliable(s) => s.download(from).await,
}
}
pub async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
Self::AwsS3(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
Self::Unreliable(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
}
}
pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.delete(path).await,
Self::AwsS3(s) => s.delete(path).await,
Self::Unreliable(s) => s.delete(path).await,
}
}
}
@@ -199,7 +251,7 @@ impl GenericRemoteStorage {
/// this path is used for the remote object id conversion only.
pub async fn upload_storage_object(
&self,
from: Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static>,
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
) -> anyhow::Result<()> {

View File

@@ -118,7 +118,7 @@ impl RemoteStorage for LocalFs {
async fn upload(
&self,
data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,

View File

@@ -343,7 +343,7 @@ impl RemoteStorage for S3Bucket {
async fn upload(
&self,
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,

View File

@@ -84,7 +84,7 @@ impl RemoteStorage for UnreliableWrapper {
async fn upload(
&self,
data: Box<(dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static)>,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,

View File

@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer)).unwrap();
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
is_incremental: false,
short_id: format!("Layer {}", i),
};
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer)).unwrap();
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -829,12 +829,9 @@ components:
type: object
required:
- id
- state
properties:
id:
type: string
state:
type: string
current_physical_size:
type: integer
has_in_progress_downloads:

View File

@@ -465,7 +465,7 @@ async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, A
.iter()
.map(|(id, state)| TenantInfo {
id: *id,
state: *state,
state: state.clone(),
current_physical_size: None,
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
})
@@ -490,7 +490,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
let state = tenant.current_state();
Ok(TenantInfo {
id: tenant_id,
state,
state: state.clone(),
current_physical_size: Some(current_physical_size),
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
})
@@ -931,7 +931,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test");
tenant.set_broken("broken from test".to_owned());
json_response(StatusCode::OK, ())
}

View File

@@ -6,7 +6,8 @@ use metrics::{
UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::models::state;
use pageserver_api::models::TenantState;
use strum::VariantNames;
use utils::id::{TenantId, TimelineId};
/// Prometheus histogram buckets (in seconds) for operations in the critical
@@ -147,15 +148,6 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define current logical size metric")
});
// Metrics collected on tenant states.
const TENANT_STATE_OPTIONS: &[&str] = &[
state::LOADING,
state::ATTACHING,
state::ACTIVE,
state::STOPPING,
state::BROKEN,
];
pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_tenant_states_count",
@@ -707,7 +699,7 @@ impl Drop for TimelineMetrics {
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
let tid = tenant_id.to_string();
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
for state in TENANT_STATE_OPTIONS {
for state in TenantState::VARIANTS {
let _ = TENANT_STATE_METRIC.remove_label_values(&[&tid, state]);
}
}

View File

@@ -177,9 +177,9 @@ impl UninitializedTimeline<'_> {
///
/// The new timeline is initialized in Active state, and its background jobs are
/// started
pub fn initialize(self, _ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(&mut timelines, true, true)
self.initialize_with_lock(ctx, &mut timelines, true, true)
}
/// Like `initialize`, but the caller is already holding lock on Tenant::timelines.
@@ -189,6 +189,7 @@ impl UninitializedTimeline<'_> {
/// been initialized.
fn initialize_with_lock(
mut self,
ctx: &RequestContext,
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
load_layer_map: bool,
activate: bool,
@@ -229,7 +230,9 @@ impl UninitializedTimeline<'_> {
new_timeline.maybe_spawn_flush_loop();
if activate {
new_timeline.activate();
new_timeline
.activate(ctx)
.context("initializing timeline activation")?;
}
}
}
@@ -264,7 +267,10 @@ impl UninitializedTimeline<'_> {
.await
.context("Failed to flush after basebackup import")?;
self.initialize(ctx)
// Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, false, true)
}
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
@@ -469,7 +475,7 @@ impl Tenant {
local_metadata: Option<TimelineMetadata>,
ancestor: Option<Arc<Timeline>>,
first_save: bool,
_ctx: &RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_id;
@@ -504,7 +510,7 @@ impl Tenant {
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
// will ingest data which may require looking at the layers which are not yet available locally
match timeline.initialize_with_lock(&mut timelines_accessor, true, false) {
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) {
Ok(new_timeline) => new_timeline,
Err(e) => {
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
@@ -616,7 +622,7 @@ impl Tenant {
match tenant_clone.attach(ctx).await {
Ok(_) => {}
Err(e) => {
tenant_clone.set_broken(&e.to_string());
tenant_clone.set_broken(e.to_string());
error!("error attaching tenant: {:?}", e);
}
}
@@ -629,7 +635,7 @@ impl Tenant {
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
#[instrument(skip(self, ctx), fields(tenant_id=%self.tenant_id))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
async fn attach(self: &Arc<Tenant>, ctx: RequestContext) -> anyhow::Result<()> {
// Create directory with marker file to indicate attaching state.
// The load_local_tenants() function in tenant::mgr relies on the marker file
@@ -750,7 +756,7 @@ impl Tenant {
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate()?;
self.activate(&ctx)?;
info!("Done");
@@ -824,7 +830,10 @@ impl Tenant {
pub fn create_broken_tenant(conf: &'static PageServerConf, tenant_id: TenantId) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
Arc::new(Tenant::new(
TenantState::Broken,
TenantState::Broken {
reason: "create_broken_tenant".into(),
backtrace: String::new(),
},
conf,
TenantConfOpt::default(),
wal_redo_manager,
@@ -885,7 +894,7 @@ impl Tenant {
match tenant_clone.load(&ctx).await {
Ok(()) => {}
Err(err) => {
tenant_clone.set_broken(&err.to_string());
tenant_clone.set_broken(err.to_string());
error!("could not load tenant {tenant_id}: {err:?}");
}
}
@@ -1022,7 +1031,7 @@ impl Tenant {
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate()?;
self.activate(ctx)?;
info!("Done");
@@ -1358,12 +1367,7 @@ impl Tenant {
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.tenant_id),
Some(timeline_id),
)
.await;
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
info!("waiting for timeline tasks to shutdown");
@@ -1442,7 +1446,7 @@ impl Tenant {
}
pub fn current_state(&self) -> TenantState {
*self.state.borrow()
self.state.borrow().clone()
}
pub fn is_active(&self) -> bool {
@@ -1450,18 +1454,18 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(&self) -> anyhow::Result<()> {
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
let mut result = Ok(());
self.state.send_modify(|current_state| {
match *current_state {
match &*current_state {
TenantState::Active => {
// activate() was called on an already Active tenant. Shouldn't happen.
result = Err(anyhow::anyhow!("Tenant is already active"));
}
TenantState::Broken => {
TenantState::Broken { reason, .. } => {
// This shouldn't happen either
result = Err(anyhow::anyhow!(
"Could not activate tenant because it is in broken state"
"Could not activate tenant because it is in broken state due to: {reason}",
));
}
TenantState::Stopping => {
@@ -1484,7 +1488,23 @@ impl Tenant {
tasks::start_background_loops(self.tenant_id);
for timeline in not_broken_timelines {
timeline.activate();
match timeline
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {}
Err(e) => {
error!(
"Failed to activate timeline {}: {:#}",
timeline.timeline_id, e
);
timeline.set_state(TimelineState::Broken);
*current_state = TenantState::broken_from_reason(format!(
"failed to activate timeline {}: {}",
timeline.timeline_id, e
));
}
}
}
}
}
@@ -1495,7 +1515,7 @@ impl Tenant {
/// Change tenant status to Stopping, to mark that it is being shut down
pub fn set_stopping(&self) {
self.state.send_modify(|current_state| {
match *current_state {
match current_state {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
@@ -1511,8 +1531,8 @@ impl Tenant {
timeline.set_state(TimelineState::Stopping);
}
}
TenantState::Broken => {
info!("Cannot set tenant to Stopping state, it is already in Broken state");
TenantState::Broken { reason, .. } => {
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
@@ -1523,7 +1543,7 @@ impl Tenant {
});
}
pub fn set_broken(&self, reason: &str) {
pub fn set_broken(&self, reason: String) {
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Active => {
@@ -1531,24 +1551,24 @@ impl Tenant {
// while loading or attaching a tenant. A tenant that has already been
// activated should never be marked as broken. We cope with it the best
// we can, but it shouldn't happen.
*current_state = TenantState::Broken;
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
}
TenantState::Broken => {
TenantState::Broken { .. } => {
// This shouldn't happen either
warn!("Tenant is already in Broken state");
}
TenantState::Stopping => {
// This shouldn't happen either
*current_state = TenantState::Broken;
warn!(
"Marking Stopping tenant as Broken state, reason: {}",
reason
);
*current_state = TenantState::broken_from_reason(reason);
}
TenantState::Loading | TenantState::Attaching => {
info!("Setting tenant as Broken state, reason: {}", reason);
*current_state = TenantState::Broken;
*current_state = TenantState::broken_from_reason(reason);
}
}
});
@@ -1561,7 +1581,7 @@ impl Tenant {
pub async fn wait_to_become_active(&self) -> anyhow::Result<()> {
let mut receiver = self.state.subscribe();
loop {
let current_state = *receiver.borrow_and_update();
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching => {
// in these states, there's a chance that we can reach ::Active
@@ -1570,12 +1590,12 @@ impl Tenant {
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken | TenantState::Stopping => {
TenantState::Broken { .. } | TenantState::Stopping => {
// There's no chance the tenant can transition back into ::Active
anyhow::bail!(
"Tenant {} will not become active. Current state: {:?}",
self.tenant_id,
current_state,
&current_state,
);
}
}
@@ -1756,21 +1776,23 @@ impl Tenant {
let (state, mut rx) = watch::channel(state);
tokio::spawn(async move {
let current_state = *rx.borrow_and_update();
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
let tid = tenant_id.to_string();
TENANT_STATE_METRIC
.with_label_values(&[&tid, current_state.as_str()])
.with_label_values(&[&tid, current_state])
.inc();
loop {
match rx.changed().await {
Ok(()) => {
let new_state = *rx.borrow();
let new_state: &'static str = From::from(&*rx.borrow_and_update());
TENANT_STATE_METRIC
.with_label_values(&[&tid, current_state.as_str()])
.with_label_values(&[&tid, current_state])
.dec();
TENANT_STATE_METRIC
.with_label_values(&[&tid, new_state.as_str()])
.with_label_values(&[&tid, new_state])
.inc();
current_state = new_state;
}
Err(_sender_dropped_error) => {
info!("Tenant dropped the state updates sender, quitting waiting for tenant state change");
@@ -2093,7 +2115,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
_ctx: &RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let src_id = src_timeline.timeline_id;
@@ -2186,7 +2208,7 @@ impl Tenant {
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(&mut timelines, true, true)?;
.initialize_with_lock(ctx, &mut timelines, true, true)?;
drop(timelines);
// Root timeline gets its layers during creation and uploads them along with the metadata.
@@ -2297,9 +2319,11 @@ impl Tenant {
)
})?;
// Initialize the timeline without loading the layer map, because we already updated the layer
// map above, when we imported the datadir.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(&mut timelines, false, true)?
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
};
info!(

View File

@@ -52,7 +52,7 @@ use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Result;
use anyhow::{bail, Result};
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
@@ -126,7 +126,7 @@ where
///
/// Insert an on-disk layer.
///
pub fn insert_historic(&mut self, layer: Arc<L>) {
pub fn insert_historic(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
self.layer_map.insert_historic_noflush(layer)
}
@@ -274,17 +274,22 @@ where
///
/// Helper function for BatchedUpdates::insert_historic
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
Arc::clone(&layer),
);
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
let key = historic_layer_coverage::LayerKey::from(&*layer);
if self.historic.contains(&key) {
bail!(
"Attempt to insert duplicate layer {} in layer map",
layer.short_id()
);
}
self.historic.insert(key, Arc::clone(&layer));
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
}
NUM_ONDISK_LAYERS.inc();
Ok(())
}
///
@@ -838,7 +843,7 @@ mod tests {
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update().insert_historic(remote.clone());
map.batch_update().insert_historic(remote.clone()).unwrap();
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map

View File

@@ -417,6 +417,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
}
pub fn contains(&self, layer_key: &LayerKey) -> bool {
match self.buffer.get(layer_key) {
Some(None) => false, // layer remove was buffered
Some(_) => true, // layer insert was buffered
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
}
}
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
self.buffer.insert(layer_key, Some(value));
}

View File

@@ -537,7 +537,7 @@ where
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Broken
| TenantState::Broken { .. }
| TenantState::Active => tenant.set_stopping(),
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
},
@@ -565,7 +565,7 @@ where
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
tenant.set_broken(&e.to_string());
tenant.set_broken(e.to_string());
}
None => {
warn!("Tenant {tenant_id} got removed from memory");

View File

@@ -209,7 +209,7 @@ async fn wait_for_active_tenant(
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = *tenant_state_updates.borrow();
let new_state = &*tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");

View File

@@ -14,6 +14,7 @@ use pageserver_api::models::{
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -30,7 +31,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::is_broker_client_initialized;
use crate::broker_client::{get_broker_client, is_broker_client_initialized};
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
@@ -71,10 +72,10 @@ use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
use walreceiver::spawn_connection_manager_task;
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
@@ -214,6 +215,7 @@ pub struct Timeline {
/// or None if WAL receiver has not received anything for this timeline
/// yet.
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
pub walreceiver: WalReceiver,
/// Relation size cache
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
@@ -866,10 +868,18 @@ impl Timeline {
Ok(())
}
pub fn activate(self: &Arc<Self>) {
pub fn activate(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
if is_broker_client_initialized() {
self.launch_wal_receiver(ctx, get_broker_client().clone())?;
} else if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
} else {
anyhow::bail!("broker client not initialized");
}
self.set_state(TimelineState::Active);
self.launch_wal_receiver();
self.launch_eviction_task();
Ok(())
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -1220,7 +1230,31 @@ impl Timeline {
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
let tenant_conf_guard = tenant_conf.read().unwrap();
let wal_connect_timeout = tenant_conf_guard
.walreceiver_connect_timeout
.unwrap_or(conf.default_tenant_conf.walreceiver_connect_timeout);
let lagging_wal_timeout = tenant_conf_guard
.lagging_wal_timeout
.unwrap_or(conf.default_tenant_conf.lagging_wal_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
Arc::new_cyclic(|myself| {
let walreceiver = WalReceiver::new(
TenantTimelineId::new(tenant_id, timeline_id),
Weak::clone(myself),
WalReceiverConf {
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: conf.availability_zone.clone(),
},
);
let mut result = Timeline {
conf,
tenant_conf,
@@ -1231,6 +1265,7 @@ impl Timeline {
layers: RwLock::new(LayerMap::default()),
walredo_mgr,
walreceiver,
remote_client: remote_client.map(Arc::new),
@@ -1350,44 +1385,17 @@ impl Timeline {
*flush_loop_state = FlushLoopState::Running;
}
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
if !is_broker_client_initialized() {
if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
return;
} else {
panic!("broker client not initialized");
}
}
pub(super) fn launch_wal_receiver(
&self,
ctx: &RequestContext,
broker_client: BrokerClientChannel,
) -> anyhow::Result<()> {
info!(
"launching WAL receiver for timeline {} of tenant {}",
self.timeline_id, self.tenant_id
);
let tenant_conf_guard = self.tenant_conf.read().unwrap();
let lagging_wal_timeout = tenant_conf_guard
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
let walreceiver_connect_timeout = tenant_conf_guard
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
let self_clone = Arc::clone(self);
let background_ctx =
// XXX: this is a detached_child. Plumb through the ctx from call sites.
RequestContext::todo_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
spawn_connection_manager_task(
self_clone,
walreceiver_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
self.conf.availability_zone.clone(),
background_ctx,
);
self.walreceiver.start(ctx, broker_client)?;
Ok(())
}
///
@@ -1438,7 +1446,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer))?;
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1470,7 +1478,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer))?;
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1544,7 +1552,7 @@ impl Timeline {
// remote index file?
// If so, rename_to_backup those files & replace their local layer with
// a RemoteLayer in the layer map so that we re-download them on-demand.
if let Some(local_layer) = local_layer {
if let Some(local_layer) = &local_layer {
let local_layer_path = local_layer
.local_path()
.expect("caller must ensure that local_layers only contains local layers");
@@ -1569,7 +1577,6 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer);
// fall-through to adding the remote layer
}
} else {
@@ -1605,7 +1612,11 @@ impl Timeline {
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1629,7 +1640,11 @@ impl Timeline {
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
}
}
}
@@ -2676,7 +2691,7 @@ impl Timeline {
.write()
.unwrap()
.batch_update()
.insert_historic(Arc::new(new_delta));
.insert_historic(Arc::new(new_delta))?;
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
@@ -2881,7 +2896,7 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
updates.insert_historic(Arc::new(l));
updates.insert_historic(Arc::new(l))?;
}
updates.flush();
drop(layers);
@@ -3314,7 +3329,7 @@ impl Timeline {
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
updates.insert_historic(x);
updates.insert_historic(x)?;
}
// Now that we have reshuffled the data to set of new delta layers, we can

View File

@@ -23,14 +23,133 @@
mod connection_manager;
mod walreceiver_connection;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
use crate::tenant::timeline::walreceiver::connection_manager::{
connection_manager_loop_step, ConnectionManagerState,
};
use anyhow::Context;
use std::future::Future;
use std::num::NonZeroU64;
use std::ops::ControlFlow;
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Weak};
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::select;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
pub use connection_manager::spawn_connection_manager_task;
use utils::id::TenantTimelineId;
use super::Timeline;
#[derive(Clone)]
pub struct WalReceiverConf {
/// The timeout on the connection to safekeeper for WAL streaming.
pub wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
pub lagging_wal_timeout: Duration,
/// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
pub max_lsn_wal_lag: NonZeroU64,
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
}
pub struct WalReceiver {
timeline: TenantTimelineId,
timeline_ref: Weak<Timeline>,
conf: WalReceiverConf,
started: AtomicBool,
}
impl WalReceiver {
pub fn new(
timeline: TenantTimelineId,
timeline_ref: Weak<Timeline>,
conf: WalReceiverConf,
) -> Self {
Self {
timeline,
timeline_ref,
conf,
started: AtomicBool::new(false),
}
}
pub fn start(
&self,
ctx: &RequestContext,
mut broker_client: BrokerClientChannel,
) -> anyhow::Result<()> {
if self.started.load(atomic::Ordering::Acquire) {
anyhow::bail!("Wal receiver is already started");
}
let timeline = self.timeline_ref.upgrade().with_context(|| {
format!("walreceiver start on a dropped timeline {}", self.timeline)
})?;
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
let walreceiver_ctx =
ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
let wal_receiver_conf = self.conf.clone();
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver manager started, connecting to broker");
let mut connection_manager_state = ConnectionManagerState::new(
timeline,
wal_receiver_conf,
);
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut connection_manager_state,
&walreceiver_ctx,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
}
},
}
}
}.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
);
self.started.store(true, atomic::Ordering::Release);
Ok(())
}
pub async fn stop(&self) {
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.timeline.tenant_id),
Some(self.timeline.timeline_id),
)
.await;
self.started.store(false, atomic::Ordering::Release);
}
}
/// A handle of an asynchronous task.
/// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
@@ -39,26 +158,26 @@ pub use connection_manager::spawn_connection_manager_task;
/// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission.
/// That may lead to certain events not being observed by the listener.
#[derive(Debug)]
pub struct TaskHandle<E> {
struct TaskHandle<E> {
join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
events_receiver: watch::Receiver<TaskStateUpdate<E>>,
cancellation: CancellationToken,
}
pub enum TaskEvent<E> {
enum TaskEvent<E> {
Update(TaskStateUpdate<E>),
End(anyhow::Result<()>),
}
#[derive(Debug, Clone)]
pub enum TaskStateUpdate<E> {
enum TaskStateUpdate<E> {
Started,
Progress(E),
}
impl<E: Clone> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
pub fn spawn<Fut>(
fn spawn<Fut>(
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
) -> Self
where
@@ -131,7 +250,7 @@ impl<E: Clone> TaskHandle<E> {
}
/// Aborts current task, waiting for it to finish.
pub async fn shutdown(self) {
async fn shutdown(self) {
if let Some(jh) = self.join_handle {
self.cancellation.cancel();
match jh.await {

View File

@@ -11,11 +11,9 @@
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
use super::TaskStateUpdate;
use crate::broker_client::get_broker_client;
use super::{TaskStateUpdate, WalReceiverConf};
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::task_mgr::{self, TaskKind};
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
@@ -38,75 +36,17 @@ use utils::{
use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle};
/// Spawns the loop to take care of the timeline's WAL streaming connection.
pub fn spawn_connection_manager_task(
timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
ctx: RequestContext,
) {
let mut broker_client = get_broker_client().clone();
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver manager started, connecting to broker");
let mut walreceiver_state = WalreceiverState::new(
timeline,
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
auth_token,
availability_zone,
);
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut walreceiver_state,
&ctx,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
}
},
}
}
}
.instrument(
info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id),
),
);
}
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
/// If storage broker subscription is cancelled, exits.
async fn connection_manager_loop_step(
pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
walreceiver_state: &mut WalreceiverState,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates();
let mut timeline_state_updates = connection_manager_state
.timeline
.subscribe_for_state_updates();
match wait_for_active_timeline(&mut timeline_state_updates).await {
ControlFlow::Continue(()) => {}
@@ -117,8 +57,8 @@ async fn connection_manager_loop_step(
}
let id = TenantTimelineId {
tenant_id: walreceiver_state.timeline.tenant_id,
timeline_id: walreceiver_state.timeline.timeline_id,
tenant_id: connection_manager_state.timeline.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
};
// Subscribe to the broker updates. Stream shares underlying TCP connection
@@ -128,7 +68,7 @@ async fn connection_manager_loop_step(
info!("Subscribed for broker timeline updates");
loop {
let time_until_next_retry = walreceiver_state.time_until_next_retry();
let time_until_next_retry = connection_manager_state.time_until_next_retry();
// These things are happening concurrently:
//
@@ -141,12 +81,12 @@ async fn connection_manager_loop_step(
// - timeline state changes to something that does not allow walreceiver to run concurrently
select! {
Some(wal_connection_update) = async {
match walreceiver_state.wal_connection.as_mut() {
match connection_manager_state.wal_connection.as_mut() {
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
None => None,
}
} => {
let wal_connection = walreceiver_state.wal_connection.as_mut()
let wal_connection = connection_manager_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Update(TaskStateUpdate::Started) => {},
@@ -156,7 +96,7 @@ async fn connection_manager_loop_step(
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = new_status;
}
@@ -165,7 +105,7 @@ async fn connection_manager_loop_step(
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
}
walreceiver_state.drop_old_connection(false).await;
connection_manager_state.drop_old_connection(false).await;
},
}
},
@@ -173,7 +113,7 @@ async fn connection_manager_loop_step(
// Got a new update from the broker
broker_update = broker_subscription.message() => {
match broker_update {
Ok(Some(broker_update)) => walreceiver_state.register_timeline_update(broker_update),
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Err(e) => {
error!("broker subscription failed: {e}");
return ControlFlow::Continue(());
@@ -187,12 +127,12 @@ async fn connection_manager_loop_step(
new_event = async {
loop {
if walreceiver_state.timeline.current_state() == TimelineState::Loading {
if connection_manager_state.timeline.current_state() == TimelineState::Loading {
warn!("wal connection manager should only be launched after timeline has become active");
}
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = walreceiver_state.timeline.current_state();
let new_state = connection_manager_state.timeline.current_state();
match new_state {
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
@@ -234,9 +174,9 @@ async fn connection_manager_loop_step(
} => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
}
if let Some(new_candidate) = walreceiver_state.next_connection_candidate() {
if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
info!("Switching to new connection candidate: {new_candidate:?}");
walreceiver_state
connection_manager_state
.change_connection(new_candidate, ctx)
.await
}
@@ -314,25 +254,17 @@ const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
struct WalreceiverState {
pub(super) struct ConnectionManagerState {
id: TenantTimelineId,
/// Use pageserver data about the timeline to filter out some of the safekeepers.
timeline: Arc<Timeline>,
/// The timeout on the connection to safekeeper for WAL streaming.
wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
lagging_wal_timeout: Duration,
/// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
max_lsn_wal_lag: NonZeroU64,
conf: WalReceiverConf,
/// Current connection to safekeeper for WAL streaming.
wal_connection: Option<WalConnection>,
/// Info about retries and unsuccessful attempts to connect to safekeepers.
wal_connection_retries: HashMap<NodeId, RetryInfo>,
/// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
}
/// Current connection data.
@@ -375,15 +307,8 @@ struct BrokerSkTimeline {
latest_update: NaiveDateTime,
}
impl WalreceiverState {
fn new(
timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
) -> Self {
impl ConnectionManagerState {
pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
let id = TenantTimelineId {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
@@ -391,14 +316,10 @@ impl WalreceiverState {
Self {
id,
timeline,
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
conf,
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_retries: HashMap::new(),
auth_token,
availability_zone,
}
}
@@ -407,7 +328,7 @@ impl WalreceiverState {
self.drop_old_connection(true).await;
let id = self.id;
let connect_timeout = self.wal_connect_timeout;
let connect_timeout = self.conf.wal_connect_timeout;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
@@ -563,7 +484,7 @@ impl WalreceiverState {
(now - existing_wal_connection.status.latest_connection_update).to_std()
{
// Drop connection if we haven't received keepalive message for a while.
if latest_interaciton > self.wal_connect_timeout {
if latest_interaciton > self.conf.wal_connect_timeout {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
@@ -573,7 +494,7 @@ impl WalreceiverState {
existing_wal_connection.status.latest_connection_update,
),
check_time: now,
threshold: self.wal_connect_timeout,
threshold: self.conf.wal_connect_timeout,
},
});
}
@@ -589,7 +510,7 @@ impl WalreceiverState {
// Check if the new candidate has much more WAL than the current one.
match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
Some(new_sk_lsn_advantage) => {
if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() {
if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
@@ -597,16 +518,16 @@ impl WalreceiverState {
reason: ReconnectReason::LaggingWal {
current_commit_lsn,
new_commit_lsn,
threshold: self.max_lsn_wal_lag,
threshold: self.conf.max_lsn_wal_lag,
},
});
}
// If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
// and the current one is not, switch to the new one.
if self.availability_zone.is_some()
if self.conf.availability_zone.is_some()
&& existing_wal_connection.availability_zone
!= self.availability_zone
&& self.availability_zone == new_availability_zone
!= self.conf.availability_zone
&& self.conf.availability_zone == new_availability_zone
{
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
@@ -677,7 +598,7 @@ impl WalreceiverState {
if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
if candidate_commit_lsn > current_commit_lsn
&& waiting_for_new_wal > self.lagging_wal_timeout
&& waiting_for_new_wal > self.conf.lagging_wal_timeout
{
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
@@ -691,7 +612,7 @@ impl WalreceiverState {
existing_wal_connection.status.latest_wal_update,
),
check_time: now,
threshold: self.lagging_wal_timeout,
threshold: self.conf.lagging_wal_timeout,
},
});
}
@@ -757,11 +678,11 @@ impl WalreceiverState {
match wal_stream_connection_config(
self.id,
info.safekeeper_connstr.as_ref(),
match &self.auth_token {
match &self.conf.auth_token {
None => None,
Some(x) => Some(x),
},
self.availability_zone.as_deref(),
self.conf.availability_zone.as_deref(),
) {
Ok(connstr) => Some((*sk_id, info, connstr)),
Err(e) => {
@@ -775,7 +696,7 @@ impl WalreceiverState {
/// Remove candidates which haven't sent broker updates for a while.
fn cleanup_old_candidates(&mut self) {
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
let lagging_wal_timeout = self.lagging_wal_timeout;
let lagging_wal_timeout = self.conf.lagging_wal_timeout;
self.wal_stream_candidates.retain(|node_id, broker_info| {
if let Ok(time_since_latest_broker_update) =
@@ -799,7 +720,7 @@ impl WalreceiverState {
}
}
async fn shutdown(mut self) {
pub(super) async fn shutdown(mut self) {
if let Some(wal_connection) = self.wal_connection.take() {
wal_connection.connection_task.shutdown().await;
}
@@ -903,7 +824,7 @@ mod tests {
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?;
let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
state.wal_connection = None;
@@ -914,7 +835,7 @@ mod tests {
(
NodeId(3),
dummy_broker_sk_timeline(
1 + state.max_lsn_wal_lag.get(),
1 + state.conf.max_lsn_wal_lag.get(),
"delay_over_threshold",
delay_over_threshold,
),
@@ -948,7 +869,7 @@ mod tests {
streaming_lsn: Some(Lsn(current_lsn)),
};
state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
@@ -966,7 +887,7 @@ mod tests {
(
connected_sk_id,
dummy_broker_sk_timeline(
current_lsn + state.max_lsn_wal_lag.get() * 2,
current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
DUMMY_SAFEKEEPER_HOST,
now,
),
@@ -978,7 +899,7 @@ mod tests {
(
NodeId(2),
dummy_broker_sk_timeline(
current_lsn + state.max_lsn_wal_lag.get() / 2,
current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
"not_enough_advanced_lsn",
now,
),
@@ -1003,7 +924,11 @@ mod tests {
state.wal_connection = None;
state.wal_stream_candidates = HashMap::from([(
NodeId(0),
dummy_broker_sk_timeline(1 + state.max_lsn_wal_lag.get(), DUMMY_SAFEKEEPER_HOST, now),
dummy_broker_sk_timeline(
1 + state.conf.max_lsn_wal_lag.get(),
DUMMY_SAFEKEEPER_HOST,
now,
),
)]);
let only_candidate = state
@@ -1101,7 +1026,7 @@ mod tests {
let now = Utc::now().naive_utc();
let connected_sk_id = NodeId(0);
let new_lsn = Lsn(current_lsn.0 + state.max_lsn_wal_lag.get() + 1);
let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
let connection_status = WalConnectionStatus {
is_connected: true,
@@ -1146,7 +1071,7 @@ mod tests {
ReconnectReason::LaggingWal {
current_commit_lsn: current_lsn,
new_commit_lsn: new_lsn,
threshold: state.max_lsn_wal_lag
threshold: state.conf.max_lsn_wal_lag
},
"Should select bigger WAL safekeeper if it starts to lag enough"
);
@@ -1165,7 +1090,7 @@ mod tests {
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
let wal_connect_timeout = chrono::Duration::from_std(state.wal_connect_timeout)?;
let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
let time_over_threshold =
Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
@@ -1208,7 +1133,7 @@ mod tests {
..
} => {
assert_eq!(last_keep_alive, Some(time_over_threshold));
assert_eq!(threshold, state.lagging_wal_timeout);
assert_eq!(threshold, state.conf.lagging_wal_timeout);
}
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
@@ -1228,7 +1153,7 @@ mod tests {
let new_lsn = Lsn(100_100).align();
let now = Utc::now().naive_utc();
let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?;
let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
let time_over_threshold =
Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
@@ -1275,7 +1200,7 @@ mod tests {
assert_eq!(current_commit_lsn, current_lsn);
assert_eq!(candidate_commit_lsn, new_lsn);
assert_eq!(last_wal_interaction, Some(time_over_threshold));
assert_eq!(threshold, state.lagging_wal_timeout);
assert_eq!(threshold, state.conf.lagging_wal_timeout);
}
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
@@ -1289,27 +1214,29 @@ mod tests {
const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
async fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState {
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.expect("Failed to create an empty timeline for dummy wal connection manager");
let timeline = timeline.initialize(&ctx).unwrap();
WalreceiverState {
ConnectionManagerState {
id: TenantTimelineId {
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
},
timeline,
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
conf: WalReceiverConf {
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
auth_token: None,
availability_zone: None,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_retries: HashMap::new(),
auth_token: None,
availability_zone: None,
}
}
@@ -1321,7 +1248,7 @@ mod tests {
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
let mut state = dummy_state(&harness).await;
state.availability_zone = test_az.clone();
state.conf.availability_zone = test_az.clone();
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();

View File

@@ -42,7 +42,7 @@ use utils::lsn::Lsn;
/// Status of the connection.
#[derive(Debug, Clone, Copy)]
pub struct WalConnectionStatus {
pub(super) struct WalConnectionStatus {
/// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
pub is_connected: bool,
/// Defines a healthy connection as one on which pageserver received WAL from safekeeper
@@ -60,7 +60,7 @@ pub struct WalConnectionStatus {
/// Open a connection to the given safekeeper and receive WAL, sending back progress
/// messages as we go.
pub async fn handle_walreceiver_connection(
pub(super) async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
wal_source_connconf: PgConnectionConfig,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,

View File

@@ -9,6 +9,14 @@
* To prevent this, it has been decided to limit possible interactions
* with the outside world using the Secure Computing BPF mode.
*
* This code is intended to support both x86_64 and aarch64. The latter
* doesn't implement some syscalls like open and select. We allow both
* select (absent on aarch64) and pselect6 (present on both architectures)
* We call select(2) through libc, and the libc wrapper calls select or pselect6
* depending on the architecture. You can check which syscalls are present on
* different architectures with the `scmp_sys_resolver` tool from the
* seccomp package.
*
* We use this mode to disable all syscalls not in the allowlist. This
* approach has its pros & cons:
*
@@ -73,8 +81,6 @@
* I suspect that certain libc functions might involve slightly
* different syscalls, e.g. select/pselect6/pselect6_time64/whatever.
*
* - Test on any arch other than amd64 to see if it works there.
*
*-------------------------------------------------------------------------
*/
@@ -122,9 +128,10 @@ seccomp_load_rules(PgSeccompRule *rules, int count)
/*
* First, check that open of a well-known file works.
* XXX: We use raw syscall() to call the very open().
* XXX: We use raw syscall() to call the very openat() which is
* present both on x86_64 and on aarch64.
*/
fd = syscall(SCMP_SYS(open), "/dev/null", O_RDONLY, 0);
fd = syscall(SCMP_SYS(openat), AT_FDCWD, "/dev/null", O_RDONLY, 0);
if (seccomp_test_sighandler_done)
ereport(FATAL,
(errcode(ERRCODE_SYSTEM_ERROR),
@@ -135,15 +142,15 @@ seccomp_load_rules(PgSeccompRule *rules, int count)
errmsg("seccomp: could not open /dev/null for seccomp testing: %m")));
close((int) fd);
/* Set a trap on open() to test seccomp bpf */
rule = PG_SCMP(open, SCMP_ACT_TRAP);
/* Set a trap on openat() to test seccomp bpf */
rule = PG_SCMP(openat, SCMP_ACT_TRAP);
if (do_seccomp_load_rules(&rule, 1, SCMP_ACT_ALLOW) != 0)
ereport(FATAL,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("seccomp: could not load test trap")));
/* Finally, check that open() now raises SIGSYS */
(void) syscall(SCMP_SYS(open), "/dev/null", O_RDONLY, 0);
/* Finally, check that openat() now raises SIGSYS */
(void) syscall(SCMP_SYS(openat), AT_FDCWD, "/dev/null", O_RDONLY, 0);
if (!seccomp_test_sighandler_done)
ereport(FATAL,
(errcode(ERRCODE_SYSTEM_ERROR),
@@ -224,7 +231,7 @@ seccomp_test_sighandler(int signum, siginfo_t *info, void *cxt pg_attribute_unus
die(1, DIE_PREFIX "bad signal number\n");
/* TODO: maybe somehow extract the hardcoded syscall number */
if (info->si_syscall != SCMP_SYS(open))
if (info->si_syscall != SCMP_SYS(openat))
die(1, DIE_PREFIX "bad syscall number\n");
#undef DIE_PREFIX

View File

@@ -64,6 +64,7 @@ webpki-roots.workspace = true
x509-parser.workspace = true
workspace_hack.workspace = true
tokio-util.workspace = true
[dev-dependencies]
rcgen.workspace = true

View File

@@ -40,7 +40,7 @@ pub fn configure_tls(
let mut cert_resolver = CertResolver::new();
// add default certificate
cert_resolver.add_cert(key_path, cert_path)?;
cert_resolver.add_cert(key_path, cert_path, true)?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
@@ -52,8 +52,11 @@ pub fn configure_tls(
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver
.add_cert(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
cert_resolver.add_cert(
&key_path.to_string_lossy(),
&cert_path.to_string_lossy(),
false,
)?;
}
}
}
@@ -78,16 +81,23 @@ pub fn configure_tls(
struct CertResolver {
certs: HashMap<String, Arc<rustls::sign::CertifiedKey>>,
default: Option<Arc<rustls::sign::CertifiedKey>>,
}
impl CertResolver {
fn new() -> Self {
Self {
certs: HashMap::new(),
default: None,
}
}
fn add_cert(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
fn add_cert(
&mut self,
key_path: &str,
cert_path: &str,
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
@@ -136,10 +146,13 @@ impl CertResolver {
"Failed to parse common name from certificate at '{cert_path}'."
))?;
self.certs.insert(
common_name,
Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key)),
);
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
if is_default {
self.default = Some(cert.clone());
}
self.certs.insert(common_name, cert);
Ok(())
}
@@ -172,7 +185,17 @@ impl rustls::server::ResolvesServerCert for CertResolver {
}
}
} else {
None
// No SNI, use the default certificate, otherwise we can't get to
// options parameter which can be used to set endpoint name too.
// That means that non-SNI flow will not work for CNAME domains in
// verify-full mode.
//
// If that will be a problem we can:
//
// a) Instead of multi-cert approach use single cert with extra
// domains listed in Subject Alternative Name (SAN).
// b) Deploy separate proxy instances for extra domains.
self.default.as_ref().cloned()
}
}
}

View File

@@ -22,6 +22,7 @@ use tokio::{
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
net::TcpListener,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use utils::http::{error::ApiError, json::json_response};
@@ -188,6 +189,7 @@ async fn ws_handler(
pub async fn task_main(
config: &'static ProxyConfig,
ws_listener: TcpListener,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("websocket server has shut down");
@@ -231,6 +233,7 @@ pub async fn task_main(
hyper::Server::builder(accept::from_stream(tls_listener))
.serve(make_svc)
.with_graceful_shutdown(cancellation_token.cancelled())
.await?;
Ok(())

View File

@@ -28,6 +28,7 @@ use config::ProxyConfig;
use futures::FutureExt;
use std::{borrow::Cow, future::Future, net::SocketAddr};
use tokio::{net::TcpListener, task::JoinError};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use utils::{project_git_version, sentry_init::init_sentry};
@@ -66,39 +67,48 @@ async fn main() -> anyhow::Result<()> {
let proxy_address: SocketAddr = args.get_one::<String>("proxy").unwrap().parse()?;
info!("Starting proxy on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
let mut tasks = vec![
tokio::spawn(handle_signals()),
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(proxy::task_main(config, proxy_listener)),
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
];
let mut client_tasks = vec![tokio::spawn(proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
))];
if let Some(wss_address) = args.get_one::<String>("wss") {
let wss_address: SocketAddr = wss_address.parse()?;
info!("Starting wss on {wss_address}");
let wss_listener = TcpListener::bind(wss_address).await?;
tasks.push(tokio::spawn(http::websocket::task_main(
client_tasks.push(tokio::spawn(http::websocket::task_main(
config,
wss_listener,
cancellation_token.clone(),
)));
}
let mut tasks = vec![
tokio::spawn(handle_signals(cancellation_token)),
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
];
if let Some(metrics_config) = &config.metric_collection {
tasks.push(tokio::spawn(metrics::task_main(metrics_config)));
}
// This combinator will block until either all tasks complete or
// one of them finishes with an error (others will be cancelled).
let tasks = tasks.into_iter().map(flatten_err);
let _: Vec<()> = futures::future::try_join_all(tasks).await?;
let tasks = futures::future::try_join_all(tasks.into_iter().map(flatten_err));
let client_tasks = futures::future::try_join_all(client_tasks.into_iter().map(flatten_err));
tokio::select! {
// We are only expecting an error from these forever tasks
res = tasks => { res?; },
res = client_tasks => { res?; },
}
Ok(())
}
/// Handle unix signals appropriately.
async fn handle_signals() -> anyhow::Result<()> {
async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut hangup = signal(SignalKind::hangup())?;
@@ -116,11 +126,9 @@ async fn handle_signals() -> anyhow::Result<()> {
warn!("received SIGINT, exiting immediately");
bail!("interrupted");
}
// TODO: Don't accept new proxy connections.
// TODO: Shut down once all exisiting connections have been closed.
_ = terminate.recv() => {
warn!("received SIGTERM, exiting immediately");
bail!("terminated");
warn!("received SIGTERM, shutting down once all existing connections have closed");
token.cancel();
}
}
}

View File

@@ -17,6 +17,7 @@ use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use utils::measured_stream::MeasuredStream;
@@ -63,6 +64,7 @@ static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
@@ -72,29 +74,48 @@ pub async fn task_main(
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let mut connections = tokio::task::JoinSet::new();
let cancel_map = Arc::new(CancelMap::default());
loop {
let (socket, peer_addr) = listener.accept().await?;
info!("accepted postgres client connection from {peer_addr}");
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
info!("accepted postgres client connection from {peer_addr}");
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
tokio::spawn(
async move {
info!("spawned a task for {peer_addr}");
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
connections.spawn(
async move {
info!("spawned a task for {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set socket option")?;
socket
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(config, &cancel_map, session_id, socket).await
handle_client(config, &cancel_map, session_id, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
}),
);
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
}),
);
_ = cancellation_token.cancelled() => {
drop(listener);
break;
}
}
}
// Drain connections
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
Ok(())
}
// TODO(tech debt): unite this with its twin below.

View File

@@ -30,6 +30,7 @@ serde_with.workspace = true
signal-hook.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["fs"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
toml_edit.workspace = true
tracing.workspace = true

View File

@@ -4,8 +4,9 @@
//!
use anyhow::{Context, Result};
use postgres_backend::QueryError;
use std::{future, thread};
use std::{future, thread, time::Duration};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader;
use tracing::*;
use utils::measured_stream::MeasuredStream;
@@ -67,41 +68,52 @@ fn handle_socket(
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let local = tokio::task::LocalSet::new();
socket.set_nodelay(true)?;
let peer_addr = socket.peer_addr()?;
let traffic_metrics = TrafficMetrics::new();
if let Some(current_az) = conf.availability_zone.as_deref() {
traffic_metrics.set_sk_az(current_az);
}
// TimeoutReader wants async runtime during creation.
runtime.block_on(async move {
// Set timeout on reading from the socket. It prevents hanged up connection
// if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
// default, and tokio doesn't provide ability to set it out of the box.
let mut socket = TimeoutReader::new(socket);
let wal_service_timeout = Duration::from_secs(60 * 10);
socket.set_timeout(Some(wal_service_timeout));
// pin! is here because TimeoutReader (due to storing sleep future inside)
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
// shouldn't be moved.
tokio::pin!(socket);
let socket = MeasuredStream::new(
socket,
|cnt| {
traffic_metrics.observe_read(cnt);
},
|cnt| {
traffic_metrics.observe_write(cnt);
},
);
let traffic_metrics = TrafficMetrics::new();
if let Some(current_az) = conf.availability_zone.as_deref() {
traffic_metrics.set_sk_az(current_az);
}
let auth_type = match conf.auth {
None => AuthType::Trust,
Some(_) => AuthType::NeonJWT,
};
let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
local.block_on(
&runtime,
pgbackend.run(&mut conn_handler, future::pending::<()>),
)?;
let socket = MeasuredStream::new(
socket,
|cnt| {
traffic_metrics.observe_read(cnt);
},
|cnt| {
traffic_metrics.observe_write(cnt);
},
);
Ok(())
let auth_type = match conf.auth {
None => AuthType::Trust,
Some(_) => AuthType::NeonJWT,
};
let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend
.run(&mut conn_handler, future::pending::<()>)
.await
})
}
/// Unique WAL service connection ids are logged in spans for observability.

View File

@@ -114,7 +114,7 @@ class NeonCompare(PgCompare):
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
# Start pg
self._pg = self.env.postgres.create_start(branch_name, "main", self.tenant)
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
@property
def pg(self) -> PgProtocol:

View File

@@ -830,7 +830,7 @@ class NeonEnvBuilder:
# Stop all the nodes.
if self.env:
log.info("Cleaning up all storage and compute nodes")
self.env.postgres.stop_all()
self.env.endpoints.stop_all()
for sk in self.env.safekeepers:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
@@ -894,7 +894,7 @@ class NeonEnv:
self.port_distributor = config.port_distributor
self.s3_mock_server = config.mock_s3_server
self.neon_cli = NeonCli(env=self)
self.postgres = PostgresFactory(self)
self.endpoints = EndpointFactory(self)
self.safekeepers: List[Safekeeper] = []
self.broker = config.broker
self.remote_storage = config.remote_storage
@@ -902,6 +902,7 @@ class NeonEnv:
self.pg_version = config.pg_version
self.neon_binpath = config.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -1015,6 +1016,13 @@ class NeonEnv:
priv = (Path(self.repo_dir) / "auth_private_key.pem").read_text()
return AuthKeys(pub=pub, priv=priv)
def generate_endpoint_id(self) -> str:
"""
Generate a unique endpoint ID
"""
self.endpoint_counter += 1
return "ep-" + str(self.endpoint_counter)
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(
@@ -1073,7 +1081,7 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
"""
yield _shared_simple_env
_shared_simple_env.postgres.stop_all()
_shared_simple_env.endpoints.stop_all()
@pytest.fixture(scope="function")
@@ -1097,7 +1105,7 @@ def neon_env_builder(
neon_env_builder.init_start().
After the initialization, you can launch compute nodes by calling
the functions in the 'env.postgres' factory object, stop/start the
the functions in the 'env.endpoints' factory object, stop/start the
nodes, etc.
"""
@@ -1438,16 +1446,16 @@ class NeonCli(AbstractNeonCli):
args.extend(["-m", "immediate"])
return self.raw_cli(args)
def pg_create(
def endpoint_create(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"pg",
"endpoint",
"create",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
@@ -1460,22 +1468,22 @@ class NeonCli(AbstractNeonCli):
args.extend(["--lsn", str(lsn)])
if port is not None:
args.extend(["--port", str(port)])
if node_name is not None:
args.append(node_name)
if endpoint_id is not None:
args.append(endpoint_id)
res = self.raw_cli(args)
res.check_returncode()
return res
def pg_start(
def endpoint_start(
self,
node_name: str,
endpoint_id: str,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"pg",
"endpoint",
"start",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
@@ -1486,30 +1494,30 @@ class NeonCli(AbstractNeonCli):
args.append(f"--lsn={lsn}")
if port is not None:
args.append(f"--port={port}")
if node_name is not None:
args.append(node_name)
if endpoint_id is not None:
args.append(endpoint_id)
res = self.raw_cli(args)
res.check_returncode()
return res
def pg_stop(
def endpoint_stop(
self,
node_name: str,
endpoint_id: str,
tenant_id: Optional[TenantId] = None,
destroy=False,
check_return_code=True,
) -> "subprocess.CompletedProcess[str]":
args = [
"pg",
"endpoint",
"stop",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
]
if destroy:
args.append("--destroy")
if node_name is not None:
args.append(node_name)
if endpoint_id is not None:
args.append(endpoint_id)
return self.raw_cli(args, check_return_code=check_return_code)
@@ -2033,6 +2041,17 @@ class NeonProxy(PgProtocol):
self._wait_until_ready()
return self
# Sends SIGTERM to the proxy if it has been started
def terminate(self):
if self._popen:
self._popen.terminate()
# Waits for proxy to exit if it has been opened with a default timeout of
# two seconds. Raises subprocess.TimeoutExpired if the proxy does not exit in time.
def wait_for_exit(self, timeout=2):
if self._popen:
self._popen.wait(timeout=2)
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
def _wait_until_ready(self):
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
@@ -2167,8 +2186,8 @@ def static_proxy(
yield proxy
class Postgres(PgProtocol):
"""An object representing a running postgres daemon."""
class Endpoint(PgProtocol):
"""An object representing a Postgres compute endpoint managed by the control plane."""
def __init__(
self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True
@@ -2176,33 +2195,40 @@ class Postgres(PgProtocol):
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.node_name: Optional[str] = None # dubious, see asserts below
self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
self.port = port
self.check_stop_result = check_stop_result
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
def create(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Postgres":
) -> "Endpoint":
"""
Create the pg data directory.
Create a new Postgres endpoint.
Returns self.
"""
if not config_lines:
config_lines = []
self.node_name = node_name or f"{branch_name}_pg_node"
self.env.neon_cli.pg_create(
branch_name, node_name=self.node_name, tenant_id=self.tenant_id, lsn=lsn, port=self.port
if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id()
self.endpoint_id = endpoint_id
self.env.neon_cli.endpoint_create(
branch_name,
endpoint_id=self.endpoint_id,
tenant_id=self.tenant_id,
lsn=lsn,
port=self.port,
)
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
if config_lines is None:
@@ -2215,26 +2241,30 @@ class Postgres(PgProtocol):
return self
def start(self) -> "Postgres":
def start(self) -> "Endpoint":
"""
Start the Postgres instance.
Returns self.
"""
assert self.node_name is not None
assert self.endpoint_id is not None
log.info(f"Starting postgres node {self.node_name}")
log.info(f"Starting postgres endpoint {self.endpoint_id}")
self.env.neon_cli.pg_start(self.node_name, tenant_id=self.tenant_id, port=self.port)
self.env.neon_cli.endpoint_start(self.endpoint_id, tenant_id=self.tenant_id, port=self.port)
self.running = True
return self
def endpoint_path(self) -> Path:
"""Path to endpoint directory"""
assert self.endpoint_id
path = Path("endpoints") / self.endpoint_id
return self.env.repo_dir / path
def pg_data_dir_path(self) -> str:
"""Path to data directory"""
assert self.node_name
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
return os.path.join(self.env.repo_dir, path)
"""Path to Postgres data directory"""
return os.path.join(self.endpoint_path(), "pgdata")
def pg_xact_dir_path(self) -> str:
"""Path to pg_xact dir"""
@@ -2248,7 +2278,7 @@ class Postgres(PgProtocol):
"""Path to postgresql.conf"""
return os.path.join(self.pg_data_dir_path(), "postgresql.conf")
def adjust_for_safekeepers(self, safekeepers: str) -> "Postgres":
def adjust_for_safekeepers(self, safekeepers: str) -> "Endpoint":
"""
Adjust instance config for working with wal acceptors instead of
pageserver (pre-configured by CLI) directly.
@@ -2272,7 +2302,7 @@ class Postgres(PgProtocol):
f.write("neon.safekeepers = '{}'\n".format(safekeepers))
return self
def config(self, lines: List[str]) -> "Postgres":
def config(self, lines: List[str]) -> "Endpoint":
"""
Add lines to postgresql.conf.
Lines should be an array of valid postgresql.conf rows.
@@ -2286,32 +2316,32 @@ class Postgres(PgProtocol):
return self
def stop(self) -> "Postgres":
def stop(self) -> "Endpoint":
"""
Stop the Postgres instance if it's running.
Returns self.
"""
if self.running:
assert self.node_name is not None
self.env.neon_cli.pg_stop(
self.node_name, self.tenant_id, check_return_code=self.check_stop_result
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, self.tenant_id, check_return_code=self.check_stop_result
)
self.running = False
return self
def stop_and_destroy(self) -> "Postgres":
def stop_and_destroy(self) -> "Endpoint":
"""
Stop the Postgres instance, then destroy it.
Stop the Postgres instance, then destroy the endpoint.
Returns self.
"""
assert self.node_name is not None
self.env.neon_cli.pg_stop(
self.node_name, self.tenant_id, True, check_return_code=self.check_stop_result
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, self.tenant_id, True, check_return_code=self.check_stop_result
)
self.node_name = None
self.endpoint_id = None
self.running = False
return self
@@ -2319,13 +2349,12 @@ class Postgres(PgProtocol):
def create_start(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Postgres":
) -> "Endpoint":
"""
Create a Postgres instance, apply config
and then start it.
Create an endpoint, apply config, and start Postgres.
Returns self.
"""
@@ -2333,7 +2362,7 @@ class Postgres(PgProtocol):
self.create(
branch_name=branch_name,
node_name=node_name,
endpoint_id=endpoint_id,
config_lines=config_lines,
lsn=lsn,
).start()
@@ -2342,7 +2371,7 @@ class Postgres(PgProtocol):
return self
def __enter__(self) -> "Postgres":
def __enter__(self) -> "Endpoint":
return self
def __exit__(
@@ -2354,33 +2383,33 @@ class Postgres(PgProtocol):
self.stop()
class PostgresFactory:
"""An object representing multiple running postgres daemons."""
class EndpointFactory:
"""An object representing multiple compute endpoints."""
def __init__(self, env: NeonEnv):
self.env = env
self.num_instances: int = 0
self.instances: List[Postgres] = []
self.endpoints: List[Endpoint] = []
def create_start(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> Postgres:
pg = Postgres(
) -> Endpoint:
ep = Endpoint(
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
self.instances.append(pg)
self.endpoints.append(ep)
return pg.create_start(
return ep.create_start(
branch_name=branch_name,
node_name=node_name,
endpoint_id=endpoint_id,
config_lines=config_lines,
lsn=lsn,
)
@@ -2388,30 +2417,33 @@ class PostgresFactory:
def create(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> Postgres:
pg = Postgres(
) -> Endpoint:
ep = Endpoint(
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
self.instances.append(pg)
if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id()
return pg.create(
self.num_instances += 1
self.endpoints.append(ep)
return ep.create(
branch_name=branch_name,
node_name=node_name,
endpoint_id=endpoint_id,
lsn=lsn,
config_lines=config_lines,
)
def stop_all(self) -> "PostgresFactory":
for pg in self.instances:
pg.stop()
def stop_all(self) -> "EndpointFactory":
for ep in self.endpoints:
ep.stop()
return self
@@ -2786,16 +2818,16 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
def check_restored_datadir_content(
test_output_dir: Path,
env: NeonEnv,
pg: Postgres,
endpoint: Endpoint,
):
# Get the timeline ID. We need it for the 'basebackup' command
timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0])
timeline = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
# stop postgres to ensure that files won't change
pg.stop()
endpoint.stop()
# Take a basebackup from pageserver
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
@@ -2805,7 +2837,7 @@ def check_restored_datadir_content(
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.pageserver.service_port.pg} \
-c 'basebackup {pg.tenant_id} {timeline}' \
-c 'basebackup {endpoint.tenant_id} {timeline}' \
| tar -x -C {restored_dir_path}
"""
@@ -2822,8 +2854,8 @@ def check_restored_datadir_content(
assert result.returncode == 0
# list files we're going to compare
assert pg.pgdata_dir
pgdata_files = list_files_to_compare(Path(pg.pgdata_dir))
assert endpoint.pgdata_dir
pgdata_files = list_files_to_compare(Path(endpoint.pgdata_dir))
restored_files = list_files_to_compare(restored_dir_path)
# check that file sets are equal
@@ -2834,12 +2866,12 @@ def check_restored_datadir_content(
# We've already filtered all mismatching files in list_files_to_compare(),
# so here expect that the content is identical
(match, mismatch, error) = filecmp.cmpfiles(
pg.pgdata_dir, restored_dir_path, pgdata_files, shallow=False
endpoint.pgdata_dir, restored_dir_path, pgdata_files, shallow=False
)
log.info(f"filecmp result mismatch and error lists:\n\t mismatch={mismatch}\n\t error={error}")
for f in mismatch:
f1 = os.path.join(pg.pgdata_dir, f)
f1 = os.path.join(endpoint.pgdata_dir, f)
f2 = os.path.join(restored_dir_path, f)
stdout_filename = "{}.filediff".format(f2)
@@ -2854,24 +2886,24 @@ def check_restored_datadir_content(
def wait_for_last_flush_lsn(
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def wait_for_wal_insert_lsn(
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def fork_at_current_lsn(
env: NeonEnv,
pg: Postgres,
endpoint: Endpoint,
new_branch_name: str,
ancestor_branch_name: str,
tenant_id: Optional[TenantId] = None,
@@ -2881,7 +2913,7 @@ def fork_at_current_lsn(
The "last LSN" is taken from the given Postgres instance. The pageserver will wait for all the
the WAL up to that LSN to arrive in the pageserver before creating the branch.
"""
current_lsn = pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
current_lsn = endpoint.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)

View File

@@ -1,16 +1,20 @@
import time
from typing import Optional
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import Lsn, TenantId, TimelineId
def assert_tenant_status(
pageserver_http: PageserverHttpClient, tenant: TenantId, expected_status: str
def assert_tenant_state(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
expected_state: str,
message: Optional[str] = None,
):
tenant_status = pageserver_http.tenant_status(tenant)
log.info(f"tenant_status: {tenant_status}")
assert tenant_status["state"] == expected_status, tenant_status
assert tenant_status["state"]["slug"] == expected_state, message or tenant_status
def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
@@ -68,6 +72,7 @@ def wait_until_tenant_state(
tenant_id: TenantId,
expected_state: str,
iterations: int,
period: float = 1.0,
) -> bool:
"""
Does not use `wait_until` for debugging purposes
@@ -76,21 +81,28 @@ def wait_until_tenant_state(
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"] == expected_state:
if tenant["state"]["slug"] == expected_state:
return True
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
time.sleep(1)
time.sleep(period)
raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")
def wait_until_tenant_active(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, iterations: int = 30
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
iterations: int = 30,
period: float = 1.0,
):
wait_until_tenant_state(
pageserver_http, tenant_id, expected_state="Active", iterations=iterations
pageserver_http,
tenant_id,
expected_state="Active",
iterations=iterations,
period=period,
)

View File

@@ -52,13 +52,13 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
def run_pgbench(branch: str):
log.info(f"Start a pgbench workload on branch {branch}")
pg = env.postgres.create_start(branch, tenant_id=tenant)
connstr = pg.connstr()
endpoint = env.endpoints.create_start(branch, tenant_id=tenant)
connstr = endpoint.connstr()
pg_bin.run_capture(["pgbench", "-i", connstr])
pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr])
pg.stop()
endpoint.stop()
env.neon_cli.create_branch("b0", tenant_id=tenant)
@@ -96,8 +96,8 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int):
env.neon_cli.create_branch("b0")
pg = env.postgres.create_start("b0")
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", pg.connstr()])
endpoint = env.endpoints.create_start("b0")
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()])
branch_creation_durations = []
@@ -124,15 +124,15 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
timeline_id = env.neon_cli.create_branch("root")
pg = env.postgres.create_start("root")
with closing(pg.connect()) as conn:
endpoint = env.endpoints.create_start("root")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(10000):
cur.execute(f"CREATE TABLE t{i} as SELECT g FROM generate_series(1, 1000) g")
# Wait for the pageserver to finish processing all the pending WALs,
# as we don't want the LSN wait time to be included during the branch creation
flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(
env.pageserver.http_client(), env.initial_tenant, timeline_id, flush_lsn
)
@@ -142,7 +142,7 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
# run a concurrent insertion to make the ancestor "busy" during the branch creation
thread = threading.Thread(
target=pg.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
target=endpoint.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
)
thread.start()

View File

@@ -42,41 +42,41 @@ def test_compare_child_and_root_pgbench_perf(neon_compare: NeonCompare):
neon_compare.zenbenchmark.record_pg_bench_result(branch, res)
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
pg_bin.run_capture(["pgbench", "-i", pg_root.connstr(), "-s10"])
endpoint_root = env.endpoints.create_start("root")
pg_bin.run_capture(["pgbench", "-i", endpoint_root.connstr(), "-s10"])
fork_at_current_lsn(env, pg_root, "child", "root")
fork_at_current_lsn(env, endpoint_root, "child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", pg_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", pg_child.connstr()])
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", endpoint_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", endpoint_child.connstr()])
def test_compare_child_and_root_write_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
endpoint_root = env.endpoints.create_start("root")
pg_root.safe_psql(
endpoint_root.safe_psql(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
)
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
with neon_compare.record_duration("root_run_duration"):
pg_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
endpoint_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
with neon_compare.record_duration("child_run_duration"):
pg_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
endpoint_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
endpoint_root = env.endpoints.create_start("root")
pg_root.safe_psql_many(
endpoint_root.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
@@ -84,12 +84,12 @@ def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
)
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
with neon_compare.record_duration("root_run_duration"):
pg_root.safe_psql("SELECT count(*) from foo")
endpoint_root.safe_psql("SELECT count(*) from foo")
with neon_compare.record_duration("child_run_duration"):
pg_child.safe_psql("SELECT count(*) from foo")
endpoint_child.safe_psql("SELECT count(*) from foo")
# -----------------------------------------------------------------------

View File

@@ -35,14 +35,14 @@ def test_bulk_tenant_create(
# if use_safekeepers == 'with_sa':
# wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start(
endpoint_tenant = env.endpoints.create_start(
f"test_bulk_tenant_create_{tenants_count}_{i}", tenant_id=tenant
)
end = timeit.default_timer()
time_slices.append(end - start)
pg_tenant.stop()
endpoint_tenant.stop()
zenbenchmark.record(
"tenant_creation_time",

View File

@@ -18,8 +18,8 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
timeline_id = env.neon_cli.create_branch("test_bulk_update")
tenant_id = env.initial_tenant
pg = env.postgres.create_start("test_bulk_update")
cur = pg.connect().cursor()
endpoint = env.endpoints.create_start("test_bulk_update")
cur = endpoint.connect().cursor()
cur.execute("set statement_timeout=0")
cur.execute(f"create table t(x integer) WITH (fillfactor={fillfactor})")
@@ -28,13 +28,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-no-prefetch"):
cur.execute("update t set x=x+1")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-no-prefetch"):
cur.execute("delete from t")
@@ -50,13 +50,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
cur.execute(f"insert into t2 values (generate_series(1,{n_records}))")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-with-prefetch"):
cur.execute("update t2 set x=x+1")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-with-prefetch"):
cur.execute("delete from t2")

View File

@@ -33,11 +33,11 @@ def test_compaction(neon_compare: NeonCompare):
# Create some tables, and run a bunch of INSERTs and UPDATes on them,
# to generate WAL and layers
pg = env.postgres.create_start(
endpoint = env.endpoints.create_start(
"main", tenant_id=tenant_id, config_lines=["shared_buffers=512MB"]
)
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(100):
cur.execute(f"create table tbl{i} (i int, j int);")
@@ -45,7 +45,7 @@ def test_compaction(neon_compare: NeonCompare):
for j in range(100):
cur.execute(f"update tbl{i} set j = {j};")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# First compaction generates L1 layers
with neon_compare.zenbenchmark.record_duration("compaction"):

View File

@@ -2,13 +2,13 @@ import threading
import pytest
from fixtures.compare_fixtures import PgCompare
from fixtures.neon_fixtures import Postgres
from fixtures.neon_fixtures import PgProtocol
from performance.test_perf_pgbench import get_scales_matrix
from performance.test_wal_backpressure import record_read_latency
def start_write_workload(pg: Postgres, scale: int = 10):
def start_write_workload(pg: PgProtocol, scale: int = 10):
with pg.connect().cursor() as cur:
cur.execute(f"create table big as select generate_series(1,{scale*100_000})")

View File

@@ -25,8 +25,8 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
)
env.neon_cli.create_timeline("test_layer_map", tenant_id=tenant)
pg = env.postgres.create_start("test_layer_map", tenant_id=tenant)
cur = pg.connect().cursor()
endpoint = env.endpoints.create_start("test_layer_map", tenant_id=tenant)
cur = endpoint.connect().cursor()
cur.execute("create table t(x integer)")
for i in range(n_iters):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")

View File

@@ -14,19 +14,19 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
# Start
env.neon_cli.create_branch("test_startup")
with zenbenchmark.record_duration("startup_time"):
pg = env.postgres.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint = env.endpoints.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Restart
pg.stop_and_destroy()
endpoint.stop_and_destroy()
with zenbenchmark.record_duration("restart_time"):
pg.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Fill up
num_rows = 1000000 # 30 MB
num_tables = 100
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(num_tables):
cur.execute(f"create table t_{i} (i integer);")
@@ -34,18 +34,18 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
# Read
with zenbenchmark.record_duration("read_time"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")
# Read again
with zenbenchmark.record_duration("second_read_time"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")
# Restart
pg.stop_and_destroy()
endpoint.stop_and_destroy()
with zenbenchmark.record_duration("restart_with_data"):
pg.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Read
with zenbenchmark.record_duration("read_after_restart"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")

View File

@@ -22,8 +22,8 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
pg_branch0 = env.postgres.create_start("main", tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
branch0_cur = endpoint_branch0.connect().cursor()
branch0_timeline = TimelineId(query_scalar(branch0_cur, "SHOW neon.timeline_id"))
log.info(f"b0 timeline {branch0_timeline}")
@@ -44,10 +44,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
# Create branch1.
env.neon_cli.create_branch("branch1", "main", tenant_id=tenant, ancestor_start_lsn=lsn_100)
pg_branch1 = env.postgres.create_start("branch1", tenant_id=tenant)
endpoint_branch1 = env.endpoints.create_start("branch1", tenant_id=tenant)
log.info("postgres is running on 'branch1' branch")
branch1_cur = pg_branch1.connect().cursor()
branch1_cur = endpoint_branch1.connect().cursor()
branch1_timeline = TimelineId(query_scalar(branch1_cur, "SHOW neon.timeline_id"))
log.info(f"b1 timeline {branch1_timeline}")
@@ -67,9 +67,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
# Create branch2.
env.neon_cli.create_branch("branch2", "branch1", tenant_id=tenant, ancestor_start_lsn=lsn_200)
pg_branch2 = env.postgres.create_start("branch2", tenant_id=tenant)
endpoint_branch2 = env.endpoints.create_start("branch2", tenant_id=tenant)
log.info("postgres is running on 'branch2' branch")
branch2_cur = pg_branch2.connect().cursor()
branch2_cur = endpoint_branch2.connect().cursor()
branch2_timeline = TimelineId(query_scalar(branch2_cur, "SHOW neon.timeline_id"))
log.info(f"b2 timeline {branch2_timeline}")

View File

@@ -64,9 +64,9 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
branch = "test_compute_auth_to_pageserver"
env.neon_cli.create_branch(branch)
pg = env.postgres.create_start(branch)
endpoint = env.endpoints.create_start(branch)
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -83,7 +83,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
branch = f"test_auth_failures_auth_enabled_{auth_enabled}"
timeline_id = env.neon_cli.create_branch(branch)
env.postgres.create_start(branch)
env.endpoints.create_start(branch)
tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant)
invalid_tenant_token = env.auth_keys.generate_tenant_token(TenantId.generate())

View File

@@ -5,7 +5,7 @@ from contextlib import closing, contextmanager
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder
pytest_plugins = "fixtures.neon_fixtures"
@@ -20,10 +20,10 @@ def pg_cur(pg):
# Periodically check that all backpressure lags are below the configured threshold,
# assert if they are not.
# If the check query fails, stop the thread. Main thread should notice that and stop the test.
def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interval=5):
def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_interval=5):
log.info("checks started")
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
@@ -41,7 +41,7 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv
max_replication_apply_lag_bytes = res[0]
log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes")
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
while not stop_event.is_set():
try:
cur.execute(
@@ -102,14 +102,14 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# Create a branch for us
env.neon_cli.create_branch("test_backpressure")
pg = env.postgres.create_start(
endpoint = env.endpoints.create_start(
"test_backpressure", config_lines=["max_replication_write_lag=30MB"]
)
log.info("postgres is running on 'test_backpressure' branch")
# setup check thread
check_stop_event = threading.Event()
check_thread = threading.Thread(target=check_backpressure, args=(pg, check_stop_event))
check_thread = threading.Thread(target=check_backpressure, args=(endpoint, check_stop_event))
check_thread.start()
# Configure failpoint to slow down walreceiver ingest
@@ -125,7 +125,7 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# because of the lag and waiting for lsn to replay to arrive.
time.sleep(2)
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")

View File

@@ -15,4 +15,4 @@ def test_basebackup_error(neon_simple_env: NeonEnv):
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
with pytest.raises(Exception, match="basebackup-before-control-file"):
env.postgres.create_start("test_basebackup_error")
env.endpoints.create_start("test_basebackup_error")

View File

@@ -67,9 +67,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
)
timeline_main = env.neon_cli.create_timeline("test_main", tenant_id=tenant)
pg_main = env.postgres.create_start("test_main", tenant_id=tenant)
endpoint_main = env.endpoints.create_start("test_main", tenant_id=tenant)
main_cur = pg_main.connect().cursor()
main_cur = endpoint_main.connect().cursor()
main_cur.execute(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
@@ -90,9 +90,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
env.neon_cli.create_branch(
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
)
pg_branch = env.postgres.create_start("test_branch", tenant_id=tenant)
endpoint_branch = env.endpoints.create_start("test_branch", tenant_id=tenant)
branch_cur = pg_branch.connect().cursor()
branch_cur = endpoint_branch.connect().cursor()
branch_cur.execute("INSERT INTO foo SELECT FROM generate_series(1, 100000)")
assert query_scalar(branch_cur, "SELECT count(*) FROM foo") == 200000
@@ -142,8 +142,8 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
)
b0 = env.neon_cli.create_branch("b0", tenant_id=tenant)
pg0 = env.postgres.create_start("b0", tenant_id=tenant)
res = pg0.safe_psql_many(
endpoint0 = env.endpoints.create_start("b0", tenant_id=tenant)
res = endpoint0.safe_psql_many(
queries=[
"CREATE TABLE t(key serial primary key)",
"INSERT INTO t SELECT FROM generate_series(1, 100000)",

View File

@@ -18,10 +18,10 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# Branch at the point where only 100 rows were inserted
env.neon_cli.create_branch("test_branch_behind")
pgmain = env.postgres.create_start("test_branch_behind")
endpoint_main = env.endpoints.create_start("test_branch_behind")
log.info("postgres is running on 'test_branch_behind' branch")
main_cur = pgmain.connect().cursor()
main_cur = endpoint_main.connect().cursor()
timeline = TimelineId(query_scalar(main_cur, "SHOW neon.timeline_id"))
@@ -74,15 +74,15 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
"test_branch_behind_more", "test_branch_behind", ancestor_start_lsn=lsn_b
)
pg_hundred = env.postgres.create_start("test_branch_behind_hundred")
pg_more = env.postgres.create_start("test_branch_behind_more")
endpoint_hundred = env.endpoints.create_start("test_branch_behind_hundred")
endpoint_more = env.endpoints.create_start("test_branch_behind_more")
# On the 'hundred' branch, we should see only 100 rows
hundred_cur = pg_hundred.connect().cursor()
hundred_cur = endpoint_hundred.connect().cursor()
assert query_scalar(hundred_cur, "SELECT count(*) FROM foo") == 100
# On the 'more' branch, we should see 100200 rows
more_cur = pg_more.connect().cursor()
more_cur = endpoint_more.connect().cursor()
assert query_scalar(more_cur, "SELECT count(*) FROM foo") == 200100
# All the rows are visible on the main branch
@@ -94,8 +94,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
env.neon_cli.create_branch(
"test_branch_segment_boundary", "test_branch_behind", ancestor_start_lsn=Lsn("0/3000000")
)
pg = env.postgres.create_start("test_branch_segment_boundary")
assert pg.safe_psql("SELECT 1")[0][0] == 1
endpoint = env.endpoints.create_start("test_branch_segment_boundary")
assert endpoint.safe_psql("SELECT 1")[0][0] == 1
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn: .*"):

View File

@@ -5,7 +5,7 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
@@ -40,20 +40,20 @@ def test_branching_with_pgbench(
}
)
def run_pgbench(pg: Postgres):
connstr = pg.connstr()
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-T15", connstr])
env.neon_cli.create_branch("b0", tenant_id=tenant)
pgs: List[Postgres] = []
pgs.append(env.postgres.create_start("b0", tenant_id=tenant))
endpoints: List[Endpoint] = []
endpoints.append(env.endpoints.create_start("b0", tenant_id=tenant))
threads: List[threading.Thread] = []
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0],), daemon=True))
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[0].connstr(),), daemon=True)
)
threads[-1].start()
thread_limit = 4
@@ -79,16 +79,18 @@ def test_branching_with_pgbench(
else:
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
pgs.append(env.postgres.create_start("b{}".format(i + 1), tenant_id=tenant))
endpoints.append(env.endpoints.create_start("b{}".format(i + 1), tenant_id=tenant))
threads.append(threading.Thread(target=run_pgbench, args=(pgs[-1],), daemon=True))
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[-1].connstr(),), daemon=True)
)
threads[-1].start()
for thread in threads:
thread.join()
for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
for ep in endpoints:
res = ep.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,)
@@ -110,11 +112,11 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
env = neon_simple_env
env.neon_cli.create_branch("b0")
pg0 = env.postgres.create_start("b0")
endpoint0 = env.endpoints.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
with pg0.cursor() as cur:
with endpoint0.cursor() as cur:
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
@@ -123,6 +125,6 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
pg1 = env.postgres.create_start("b1")
endpoint1 = env.endpoints.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])

View File

@@ -4,7 +4,7 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
from fixtures.types import TenantId, TimelineId
@@ -24,17 +24,17 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
]
)
tenant_timelines: List[Tuple[TenantId, TimelineId, Postgres]] = []
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
for n in range(4):
tenant_id, timeline_id = env.neon_cli.create_tenant()
pg = env.postgres.create_start("main", tenant_id=tenant_id)
with pg.cursor() as cur:
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
pg.stop()
tenant_timelines.append((tenant_id, timeline_id, pg))
endpoint.stop()
tenant_timelines.append((tenant_id, timeline_id, endpoint))
# Stop the pageserver
env.pageserver.stop()

View File

@@ -24,14 +24,14 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
"autovacuum_freeze_max_age=100000",
]
pg = env.postgres.create_start("test_clog_truncate", config_lines=config)
endpoint = env.endpoints.create_start("test_clog_truncate", config_lines=config)
log.info("postgres is running on test_clog_truncate branch")
# Install extension containing function needed for test
pg.safe_psql("CREATE EXTENSION neon_test_utils")
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
# Consume many xids to advance clog
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("select test_consume_xids(1000*1000*10);")
log.info("xids consumed")
@@ -44,7 +44,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
# wait for autovacuum to truncate the pg_xact
# XXX Is it worth to add a timeout here?
pg_xact_0000_path = os.path.join(pg.pg_xact_dir_path(), "0000")
pg_xact_0000_path = os.path.join(endpoint.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path = {pg_xact_0000_path}")
while os.path.isfile(pg_xact_0000_path):
@@ -52,7 +52,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
time.sleep(5)
# checkpoint to advance latest lsn
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("CHECKPOINT;")
lsn_after_truncation = query_scalar(cur, "select pg_current_wal_insert_lsn()")
@@ -61,10 +61,10 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
env.neon_cli.create_branch(
"test_clog_truncate_new", "test_clog_truncate", ancestor_start_lsn=lsn_after_truncation
)
pg2 = env.postgres.create_start("test_clog_truncate_new")
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
log.info("postgres is running on test_clog_truncate_new branch")
# check that new node doesn't contain truncated segment
pg_xact_0000_path_new = os.path.join(pg2.pg_xact_dir_path(), "0000")
pg_xact_0000_path_new = os.path.join(endpoint2.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path_new = {pg_xact_0000_path_new}")
assert os.path.isfile(pg_xact_0000_path_new) is False

View File

@@ -24,8 +24,8 @@ def test_lsof_pageserver_pid(neon_simple_env: NeonEnv):
def start_workload():
env.neon_cli.create_branch("test_lsof_pageserver_pid")
pg = env.postgres.create_start("test_lsof_pageserver_pid")
with closing(pg.connect()) as conn:
endpoint = env.endpoints.create_start("test_lsof_pageserver_pid")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE foo as SELECT x FROM generate_series(1,100000) x")
cur.execute("update foo set x=x+1")

View File

@@ -1,3 +1,4 @@
import copy
import os
import shutil
import subprocess
@@ -55,29 +56,31 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start()
pg = env.postgres.create_start("main")
endpoint = env.endpoints.create_start("main")
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
)
pg_bin.run(["pgbench", "--initialize", "--scale=10", pg.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", pg.connstr()])
pg_bin.run(["pg_dumpall", f"--dbname={pg.connstr()}", f"--file={test_output_dir / 'dump.sql'}"])
pg_bin.run(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
pg_bin.run(
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
env.postgres.stop_all()
env.endpoints.stop_all()
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
@@ -98,6 +101,9 @@ def test_backward_compatibility(
pg_version: str,
request: FixtureRequest,
):
"""
Test that the new binaries can read old data
"""
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
@@ -120,6 +126,7 @@ def test_backward_compatibility(
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
neon_binpath,
neon_binpath,
pg_distrib_dir,
pg_version,
port_distributor,
@@ -148,7 +155,11 @@ def test_forward_compatibility(
port_distributor: PortDistributor,
pg_version: str,
request: FixtureRequest,
neon_binpath: Path,
):
"""
Test that the old binaries can read new data
"""
compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN")
assert compatibility_neon_bin_env is not None, (
"COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries "
@@ -183,6 +194,7 @@ def test_forward_compatibility(
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
compatibility_neon_bin,
neon_binpath,
compatibility_postgres_distrib_dir,
pg_version,
port_distributor,
@@ -223,9 +235,13 @@ def prepare_snapshot(
for logfile in repo_dir.glob("**/*.log"):
logfile.unlink()
# Remove tenants data for compute
for tenant in (repo_dir / "pgdatadirs" / "tenants").glob("*"):
shutil.rmtree(tenant)
# Remove old computes in 'endpoints'. Old versions of the control plane used a directory
# called "pgdatadirs". Delete it, too.
if (repo_dir / "endpoints").exists():
shutil.rmtree(repo_dir / "endpoints")
if (repo_dir / "pgdatadirs").exists():
shutil.rmtree(repo_dir / "pgdatadirs")
os.mkdir(repo_dir / "endpoints")
# Remove wal-redo temp directory if it exists. Newer pageserver versions don't create
# them anymore, but old versions did.
@@ -326,7 +342,8 @@ def get_neon_version(neon_binpath: Path):
def check_neon_works(
repo_dir: Path,
neon_binpath: Path,
neon_target_binpath: Path,
neon_current_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
port_distributor: PortDistributor,
@@ -336,7 +353,7 @@ def check_neon_works(
):
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["neon_distrib_dir"] = str(neon_binpath)
snapshot_config["neon_distrib_dir"] = str(neon_target_binpath)
snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir)
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
@@ -347,17 +364,25 @@ def check_neon_works(
config.repo_dir = repo_dir
config.pg_version = pg_version
config.initial_tenant = snapshot_config["default_tenant_id"]
config.neon_binpath = neon_binpath
config.pg_distrib_dir = pg_distrib_dir
config.preserve_database_files = True
cli = NeonCli(config)
cli.raw_cli(["start"])
request.addfinalizer(lambda: cli.raw_cli(["stop"]))
# Use the "target" binaries to launch the storage nodes
config_target = config
config_target.neon_binpath = neon_target_binpath
cli_target = NeonCli(config_target)
# And the current binaries to launch computes
config_current = copy.copy(config)
config_current.neon_binpath = neon_current_binpath
cli_current = NeonCli(config_current)
cli_target.raw_cli(["start"])
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
pg_port = port_distributor.get_port()
cli.pg_start("main", port=pg_port)
request.addfinalizer(lambda: cli.pg_stop("main"))
cli_current.endpoint_start("main", port=pg_port)
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
pg_bin.run(["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"])

View File

@@ -13,10 +13,10 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
pg = env.postgres.create_start("test_compute_ctl")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
endpoint = env.endpoints.create_start("test_compute_ctl")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(pg.config_file_path(), "r") as f:
with open(endpoint.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
@@ -24,10 +24,13 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = pg.pg_data_dir_path()
pgdata = endpoint.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
pg.stop_and_destroy()
endpoint.stop_and_destroy()
# stop_and_destroy removes the whole endpoint directory. Recreate it.
Path(pgdata).mkdir(parents=True)
spec = (
"""

View File

@@ -12,10 +12,10 @@ def test_config(neon_simple_env: NeonEnv):
env.neon_cli.create_branch("test_config", "empty")
# change config
pg = env.postgres.create_start("test_config", config_lines=["log_min_messages=debug1"])
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
log.info("postgres is running on test_config branch")
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""

View File

@@ -21,11 +21,11 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_crafted_wal_end")
pg = env.postgres.create("test_crafted_wal_end")
endpoint = env.endpoints.create("test_crafted_wal_end")
wal_craft = WalCraft(env)
pg.config(wal_craft.postgres_config())
pg.start()
res = pg.safe_psql_many(
endpoint.config(wal_craft.postgres_config())
endpoint.start()
res = endpoint.safe_psql_many(
queries=[
"CREATE TABLE keys(key int primary key)",
"INSERT INTO keys SELECT generate_series(1, 100)",
@@ -34,7 +34,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
)
assert res[-1][0] == (5050,)
wal_craft.in_existing(wal_type, pg.connstr())
wal_craft.in_existing(wal_type, endpoint.connstr())
log.info("Restarting all safekeepers and pageservers")
env.pageserver.stop()
@@ -43,7 +43,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env.pageserver.start()
log.info("Trying more queries")
res = pg.safe_psql_many(
res = endpoint.safe_psql_many(
queries=[
"SELECT SUM(key) FROM keys",
"INSERT INTO keys SELECT generate_series(101, 200)",
@@ -60,7 +60,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env.pageserver.start()
log.info("Trying more queries (again)")
res = pg.safe_psql_many(
res = endpoint.safe_psql_many(
queries=[
"SELECT SUM(key) FROM keys",
"INSERT INTO keys SELECT generate_series(201, 300)",

View File

@@ -13,10 +13,10 @@ def test_createdb(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_createdb", "empty")
pg = env.postgres.create_start("test_createdb")
endpoint = env.endpoints.create_start("test_createdb")
log.info("postgres is running on 'test_createdb' branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
@@ -26,10 +26,10 @@ def test_createdb(neon_simple_env: NeonEnv):
# Create a branch
env.neon_cli.create_branch("test_createdb2", "test_createdb", ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start("test_createdb2")
endpoint2 = env.endpoints.create_start("test_createdb2")
# Test that you can connect to the new database on both branches
for db in (pg, pg2):
for db in (endpoint, endpoint2):
with db.cursor(dbname="foodb") as cur:
# Check database size in both branches
cur.execute(
@@ -55,17 +55,17 @@ def test_createdb(neon_simple_env: NeonEnv):
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
env.neon_cli.create_branch("test_dropdb", "empty")
pg = env.postgres.create_start("test_dropdb")
endpoint = env.endpoints.create_start("test_dropdb")
log.info("postgres is running on 'test_dropdb' branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("CREATE DATABASE foodb")
lsn_before_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
dboid = query_scalar(cur, "SELECT oid FROM pg_database WHERE datname='foodb';")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("DROP DATABASE foodb")
cur.execute("CHECKPOINT")
@@ -76,29 +76,29 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env.neon_cli.create_branch(
"test_before_dropdb", "test_dropdb", ancestor_start_lsn=lsn_before_drop
)
pg_before = env.postgres.create_start("test_before_dropdb")
endpoint_before = env.endpoints.create_start("test_before_dropdb")
env.neon_cli.create_branch(
"test_after_dropdb", "test_dropdb", ancestor_start_lsn=lsn_after_drop
)
pg_after = env.postgres.create_start("test_after_dropdb")
endpoint_after = env.endpoints.create_start("test_after_dropdb")
# Test that database exists on the branch before drop
pg_before.connect(dbname="foodb").close()
endpoint_before.connect(dbname="foodb").close()
# Test that database subdir exists on the branch before drop
assert pg_before.pgdata_dir
dbpath = pathlib.Path(pg_before.pgdata_dir) / "base" / str(dboid)
assert endpoint_before.pgdata_dir
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is True
# Test that database subdir doesn't exist on the branch after drop
assert pg_after.pgdata_dir
dbpath = pathlib.Path(pg_after.pgdata_dir) / "base" / str(dboid)
assert endpoint_after.pgdata_dir
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is False
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, pg)
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -9,10 +9,10 @@ from fixtures.utils import query_scalar
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_createuser", "empty")
pg = env.postgres.create_start("test_createuser")
endpoint = env.endpoints.create_start("test_createuser")
log.info("postgres is running on 'test_createuser' branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("CREATE USER testuser with password %s", ("testpwd",))
@@ -22,7 +22,7 @@ def test_createuser(neon_simple_env: NeonEnv):
# Create a branch
env.neon_cli.create_branch("test_createuser2", "test_createuser", ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start("test_createuser2")
endpoint2 = env.endpoints.create_start("test_createuser2")
# Test that you can connect to new branch as a new user
assert pg2.safe_psql("select current_user", user="testuser") == [("testuser",)]
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]

View File

@@ -91,8 +91,8 @@ class EvictionEnv:
This assumes that the tenant is still at the state after pbench -i.
"""
lsn = self.pgbench_init_lsns[tenant_id]
with self.neon_env.postgres.create_start("main", tenant_id=tenant_id, lsn=lsn) as pg:
self.pg_bin.run(["pgbench", "-S", pg.connstr()])
with self.neon_env.endpoints.create_start("main", tenant_id=tenant_id, lsn=lsn) as endpoint:
self.pg_bin.run(["pgbench", "-S", endpoint.connstr()])
def pageserver_start_with_disk_usage_eviction(
self, period, max_usage_pct, min_avail_bytes, mock_behavior
@@ -168,9 +168,9 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
}
)
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
pg_bin.run(["pgbench", "-i", f"-s{scale}", pg.connstr()])
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
pg_bin.run(["pgbench", "-i", f"-s{scale}", endpoint.connstr()])
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
timelines.append((tenant_id, timeline_id))

View File

@@ -4,7 +4,7 @@ from fixtures.neon_fixtures import NeonEnvBuilder
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fsm_truncate")
pg = env.postgres.create_start("test_fsm_truncate")
pg.safe_psql(
endpoint = env.endpoints.create_start("test_fsm_truncate")
endpoint.safe_psql(
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
)

View File

@@ -24,10 +24,10 @@ def test_fullbackup(
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fullbackup")
pgmain = env.postgres.create_start("test_fullbackup")
endpoint_main = env.endpoints.create_start("test_fullbackup")
log.info("postgres is running on 'test_fullbackup' branch")
with pgmain.cursor() as cur:
with endpoint_main.cursor() as cur:
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# data loading may take a while, so increase statement timeout

View File

@@ -5,9 +5,9 @@ import random
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
Postgres,
RemoteStorageKind,
wait_for_last_flush_lsn,
)
@@ -26,9 +26,9 @@ updates_performed = 0
# Run random UPDATEs on test table
async def update_table(pg: Postgres):
async def update_table(endpoint: Endpoint):
global updates_performed
pg_conn = await pg.connect_async()
pg_conn = await endpoint.connect_async()
while updates_performed < updates_to_perform:
updates_performed += 1
@@ -52,10 +52,10 @@ async def gc(env: NeonEnv, timeline: TimelineId):
# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, pg: Postgres, timeline: TimelineId):
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
for worker_id in range(num_connections):
workers.append(asyncio.create_task(update_table(pg)))
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(gc(env, timeline)))
# await all workers
@@ -72,10 +72,10 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_aggressive", "main")
pg = env.postgres.create_start("test_gc_aggressive")
endpoint = env.endpoints.create_start("test_gc_aggressive")
log.info("postgres is running on test_gc_aggressive branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# Create table, and insert the first 100 rows
@@ -89,7 +89,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
)
cur.execute("CREATE INDEX ON foo(id)")
asyncio.run(update_and_gc(env, pg, timeline))
asyncio.run(update_and_gc(env, endpoint, timeline))
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
@@ -110,11 +110,11 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_index_upload", "main")
pg = env.postgres.create_start("test_gc_index_upload")
endpoint = env.endpoints.create_start("test_gc_index_upload")
pageserver_http = env.pageserver.http_client()
pg_conn = pg.connect()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
@@ -146,7 +146,7 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
return int(total)
# Sanity check that the metric works
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
before = get_num_remote_ops("index", "upload")

Some files were not shown because too many files have changed in this diff Show More