Compare commits

...

44 Commits

Author SHA1 Message Date
George MacKerron
6abe34bfa1 Dead-end websocket experiments 2023-05-26 11:35:36 +01:00
George MacKerron
b932c14f94 Pg protocol over SQL vaguely-semi-working 2023-05-16 07:42:51 +01:00
George MacKerron
2e05a9b652 WebSocket URL query data working; pg-protocol POST requests partly working 2023-05-15 17:47:33 +01:00
George MacKerron
b808160aff Improved error handling 2023-05-09 17:24:00 +01:00
George MacKerron
2997018005 Added support to return timestamptz, timestamp, date and time for http queries 2023-05-08 11:02:12 +01:00
George MacKerron
8f98cc29fa Stop double-serializing JSON response 2023-05-05 21:30:28 +01:00
Arthur Petukhovsky
b4ee8e3b73 Box query values
SELECT 1.0 is still not working
2023-05-05 17:27:26 +00:00
George MacKerron
68ea916bf1 More tweaks, now compiling 2023-05-05 14:49:15 +01:00
George MacKerron
eea3dd54da Tweaks 2023-05-05 14:32:01 +01:00
George MacKerron
4071b22519 Further work on sql over http 2023-05-05 13:25:50 +01:00
George MacKerron
c58edec63a Small progress on sql-over-http 2023-05-05 11:16:07 +01:00
George MacKerron
fdb9d4373d More work on mapping params 2023-05-05 10:53:32 +01:00
George MacKerron
cb88df7ffa Broken/incomplete attempts to implement new http API for serverless driver 2023-05-04 22:00:47 +01:00
Arthur Petukhovsky
595248532c Add global connection cache for ws 2023-04-18 15:29:00 +00:00
Stas Kelvich
158c051c9a add sleep endpoint to simulate proxy response 2023-04-18 15:39:10 +03:00
Stas Kelvich
143c4954df use simple query 2023-04-18 13:40:49 +03:00
Eduard Dyckman
373ae7672b Allow only post connection to sql url (#4035) 2023-04-15 20:06:57 +09:00
Stas Kelvich
1233923c3a works with browser 2023-04-15 02:18:52 +03:00
Stas Kelvich
1a3a7f14dd works now, few fixes 2023-04-15 01:15:22 +03:00
Stas Kelvich
4e9067a8c2 wip: now compiles 2023-04-14 19:41:02 +03:00
Kirill Bulatov
ebea298415 Update most of the dependencies to their latest versions (#4026)
See https://github.com/neondatabase/neon/pull/3991

Brings the changes back with the right way to use new `toml_edit` to
deserialize values and a unit test for this.

All non-trivial updates extracted into separate commits, also `carho hakari` data and its manifest format were updated.

3 sets of crates remain unupdated:

* `base64` — touches proxy in a lot of places and changed its api (by 0.21 version) quite strongly since our version (0.13).
* `opentelemetry` and `opentelemetry-*` crates

```
error[E0308]: mismatched types
  --> libs/tracing-utils/src/http.rs:65:21
   |
65 |     span.set_parent(parent_ctx);
   |          ---------- ^^^^^^^^^^ expected struct `opentelemetry_api::context::Context`, found struct `opentelemetry::Context`
   |          |
   |          arguments to this method are incorrect
   |
   = note: struct `opentelemetry::Context` and struct `opentelemetry_api::context::Context` have similar names, but are actually distinct types
note: struct `opentelemetry::Context` is defined in crate `opentelemetry_api`
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/opentelemetry_api-0.19.0/src/context.rs:77:1
   |
77 | pub struct Context {
   | ^^^^^^^^^^^^^^^^^^
note: struct `opentelemetry_api::context::Context` is defined in crate `opentelemetry_api`
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/opentelemetry_api-0.18.0/src/context.rs:77:1
   |
77 | pub struct Context {
   | ^^^^^^^^^^^^^^^^^^
   = note: perhaps two different versions of crate `opentelemetry_api` are being used?
note: associated function defined here
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-opentelemetry-0.18.0/src/span_ext.rs:43:8
   |
43 |     fn set_parent(&self, cx: Context);
   |        ^^^^^^^^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `tracing-utils` due to previous error
warning: build failed, waiting for other jobs to finish...
error: could not compile `tracing-utils` due to previous error
```

`tracing-opentelemetry` of version `0.19` is not yet released, that is supposed to have the update we need.

* similarly, `rustls`, `tokio-rustls`, `rustls-*` and `tls-listener` crates have similar issue:

```
error[E0308]: mismatched types
   --> libs/postgres_backend/tests/simple_select.rs:112:78
    |
112 |     let mut make_tls_connect = tokio_postgres_rustls::MakeRustlsConnect::new(client_cfg);
    |                                --------------------------------------------- ^^^^^^^^^^ expected struct `rustls::client::client_conn::ClientConfig`, found struct `ClientConfig`
    |                                |
    |                                arguments to this function are incorrect
    |
    = note: struct `ClientConfig` and struct `rustls::client::client_conn::ClientConfig` have similar names, but are actually distinct types
note: struct `ClientConfig` is defined in crate `rustls`
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/rustls-0.21.0/src/client/client_conn.rs:125:1
    |
125 | pub struct ClientConfig {
    | ^^^^^^^^^^^^^^^^^^^^^^^
note: struct `rustls::client::client_conn::ClientConfig` is defined in crate `rustls`
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/rustls-0.20.8/src/client/client_conn.rs:91:1
    |
91  | pub struct ClientConfig {
    | ^^^^^^^^^^^^^^^^^^^^^^^
    = note: perhaps two different versions of crate `rustls` are being used?
note: associated function defined here
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-postgres-rustls-0.9.0/src/lib.rs:23:12
    |
23  |     pub fn new(config: ClientConfig) -> Self {
    |            ^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `postgres_backend` due to previous error
warning: build failed, waiting for other jobs to finish...
```

* aws crates: I could not make new API to work with bucket endpoint overload, and console e2e tests failed.
Other our tests passed, further investigation is worth to be done in https://github.com/neondatabase/neon/issues/4008
2023-04-14 18:28:54 +03:00
Vadim Kharitonov
5ffa20dd82 [proxy] adjust proxy sleep timeout 2023-04-14 15:08:07 +03:00
Vadim Kharitonov
75ea8106ec Add procps into compute containers 2023-04-14 15:02:26 +03:00
Vadim Kharitonov
017d3a390d Compile postgres with lz4 and zstd support 2023-04-14 15:02:26 +03:00
Alexey Kondratov
589cf1ed21 [compute_ctl] Do not create availability checker data on each start (#4019)
Initially, idea was to ensure that when we come and check data
availability, special service table already contains one row. So if we
loose it for some reason, we will error out.

Yet, to do availability check we anyway start compute first! So it
doesn't really add some value, but we affect each compute start as we
update at least one row in the database. Also this writes some WAL, so
if timeline is close to `neon.max_cluster_size` it could prevent compute
from starting up.

That said, do CREATE TABLE IF NOT EXISTS + UPSERT right in the
`/check_writability` handler.
2023-04-14 13:05:07 +02:00
Alexander Bayandin
0c82ff3d98 test_runner: add Timeline Inspector to Grafana links (#4021) 2023-04-14 11:46:47 +01:00
Christian Schwarz
8895f28dae make evictions_low_residence_duration_metric_threshold per-tenant (#3949)
Before this patch, if a tenant would override its eviction_policy
setting to use a lower LayerAccessThreshold::threshold than the
`evictions_low_residence_duration_metric_threshold`, the evictions done
for that tenant would count towards the
`evictions_with_low_residence_duration` metric.

That metric is used to identify pre-mature evictions, commonly triggered
by disk-usage-based eviction under disk pressure.

We don't want that to happen for the legitimate evictions of the tenant
that overrides its eviction_policy.

So, this patch
- moves the setting into TenantConf
- adds test coverage
- updates the staging & prod yamls

Forward Compatibility:
Software before this patch will ignore the new tenant conf field and use
the global one instead.
So we can roll back safely.

Backward Compatibility:
Parsing old configs with software as of this patch will fail in
`PageServerConf::parse_and_validate` with error 
`unrecognized pageserver option 'evictions_low_residence_duration_metric_threshold'`
if the option is still present in the global section.
We deal with this by updating the configs in Ansible.

fixes https://github.com/neondatabase/neon/issues/3940
2023-04-14 13:25:45 +03:00
dependabot[bot]
b6c7c3290f Bump h2 from 0.3.15 to 0.3.17 (#4020) 2023-04-13 20:03:24 +01:00
Sasha Krassovsky
fd31fafeee Make proxy shutdown when all connections are closed (#3764)
## Describe your changes
Makes Proxy start draining connections on SIGTERM.
## Issue ticket number and link
#3333
2023-04-13 19:31:30 +03:00
Alexey Kondratov
db8dd6f380 [compute_ctl] Implement live reconfiguration (#3980)
With this commit one can request compute reconfiguration
from the running `compute_ctl` with compute in `Running` state
by sending a new spec:
```shell
curl -d "{\"spec\": $(cat ./compute-spec-new.json)}" http://localhost:3080/configure
```

Internally, we start a separate configurator thread that is waiting on
`Condvar` for `ConfigurationPending` compute state in a loop. Then it does
reconfiguration, sets compute back to `Running` state and notifies other
waiters.

It will need some follow-ups, e.g. for retry logic for control-plane
requests, but should be useful for testing in the current state. This
shouldn't affect any existing environment, since computes are configured
in a different way there.

Resolves neondatabase/cloud#4433
2023-04-13 18:07:29 +02:00
Alexander Bayandin
36c20946b4 Verify extensions checksums (#4014)
To not be taken by surprise by upstream git re-tag or by malicious activity,
let's verify the checksum for extensions we download

Also, unify the installation of `pg_graphql` and `pg_tiktoken` 
with other extensions.
2023-04-13 15:25:09 +01:00
Heikki Linnakangas
89b5589b1b Tenant size should never be zero. Simplify test.
Looking at the git history of this test, I think "size == 0" used to
have a special meaning earlier, but now it should never happen.
2023-04-13 16:57:31 +03:00
Heikki Linnakangas
53f438a8a8 Rename "Postgres nodes" in control_plane to endpoints.
We use the term "endpoint" in for compute Postgres nodes in the web UI
and user-facing documentation now. Adjust the nomenclature in the code.

This changes the name of the "neon_local pg" command to "neon_local
endpoint". Also adjust names of classes, variables etc. in the python
tests accordingly.

This also changes the directory structure so that endpoints are now
stored in:

    .neon/endpoints/<endpoint id>

instead of:

    .neon/pgdatadirs/tenants/<tenant_id>/<endpoint (node) name>

The tenant ID is no longer part of the path. That means that you
cannot have two endpoints with the same name/ID in two different
tenants anymore. That's consistent with how we treat endpoints in the
real control plane and proxy: the endpoint ID must be globally unique.
2023-04-13 14:34:29 +03:00
Vadim Kharitonov
356439aa33 Add note about manual_release_instructions label (#4015)
## Describe your changes
Do not forget to process required manual stuff after release

## Issue ticket number and link

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
2023-04-13 13:13:24 +03:00
Vadim Kharitonov
c237a2f5fb Compile pg_hint_plan extension 2023-04-13 12:59:46 +03:00
Dmitry Rodionov
15d1f85552 Add reason to TenantState::Broken (#3954)
Reason and backtrace are added to the Broken state. Backtrace is automatically collected when tenant entered the broken state. The format for API, CLI and metrics is changed and unified to return tenant state name in camel case. Previously snake case was used for metrics and camel case was used for everything else. Now tenant state field in TenantInfo swagger spec is changed to contain state name in "slug" field and other fields (currently only reason and backtrace for Broken variant in "data" field). To allow for this breaking change state was removed from TenantInfo swagger spec because it was not used anywhere.

Please note that the tenant's broken reason is not persisted on disk so the reason is lost when pageserver is restarted.

Requires changes to grafana dashboard that monitors tenant states.

Closes #3001

---------

Co-authored-by: theirix <theirix@gmail.com>
2023-04-13 12:11:43 +03:00
Konstantin Knizhnik
732acc54c1 Add check for duplicates of generated image layers (#3869)
## Describe your changes

## Issue ticket number and link

#3673

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-04-13 10:19:34 +03:00
Stas Kelvich
5d0ecadf7c Add support for non-SNI case in multi-cert proxy
When no SNI is provided use the default certificate, otherwise we can't
get to the options parameter which can be used to set endpoint name too.
That means that non-SNI flow will not work for CNAME domains in verify-full
mode.
2023-04-12 18:16:49 +03:00
Kirill Bulatov
f7995b3c70 Revert "Update most of the dependencies to their latest versions (#3991)" (#4013)
This reverts commit a64044a7a9.

See https://neondb.slack.com/archives/C03H1K0PGKH/p1681306682795559
2023-04-12 14:51:59 +00:00
Alexander Bayandin
13e53e5dc8 GitHub Workflows: use '!cancelled' instead of 'success or failure' 2023-04-12 15:22:18 +01:00
Alexander Bayandin
c94b8998be GitHub Workflows: print error messages to stderr 2023-04-12 15:22:18 +01:00
Alexander Bayandin
218062ceba GitHub Workflows: use ref_name instead of ref 2023-04-12 15:22:18 +01:00
Sam Gaw
8d295780cb Add support for ip4r extension 2023-04-12 16:40:02 +03:00
Kirill Bulatov
a64044a7a9 Update most of the dependencies to their latest versions (#3991)
All non-trivial updates extracted into separate commits, also `carho
hakari` data and its manifest format were updated.

3 sets of crates remain unupdated:

* `base64` — touches proxy in a lot of places and changed its api (by
0.21 version) quite strongly since our version (0.13).
* `opentelemetry` and `opentelemetry-*` crates

```
error[E0308]: mismatched types
  --> libs/tracing-utils/src/http.rs:65:21
   |
65 |     span.set_parent(parent_ctx);
   |          ---------- ^^^^^^^^^^ expected struct `opentelemetry_api::context::Context`, found struct `opentelemetry::Context`
   |          |
   |          arguments to this method are incorrect
   |
   = note: struct `opentelemetry::Context` and struct `opentelemetry_api::context::Context` have similar names, but are actually distinct types
note: struct `opentelemetry::Context` is defined in crate `opentelemetry_api`
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/opentelemetry_api-0.19.0/src/context.rs:77:1
   |
77 | pub struct Context {
   | ^^^^^^^^^^^^^^^^^^
note: struct `opentelemetry_api::context::Context` is defined in crate `opentelemetry_api`
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/opentelemetry_api-0.18.0/src/context.rs:77:1
   |
77 | pub struct Context {
   | ^^^^^^^^^^^^^^^^^^
   = note: perhaps two different versions of crate `opentelemetry_api` are being used?
note: associated function defined here
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-opentelemetry-0.18.0/src/span_ext.rs:43:8
   |
43 |     fn set_parent(&self, cx: Context);
   |        ^^^^^^^^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `tracing-utils` due to previous error
warning: build failed, waiting for other jobs to finish...
error: could not compile `tracing-utils` due to previous error
```

`tracing-opentelemetry` of version `0.19` is not yet released, that is
supposed to have the update we need.

* similarly, `rustls`, `tokio-rustls`, `rustls-*` and `tls-listener`
crates have similar issue:

```
error[E0308]: mismatched types
   --> libs/postgres_backend/tests/simple_select.rs:112:78
    |
112 |     let mut make_tls_connect = tokio_postgres_rustls::MakeRustlsConnect::new(client_cfg);
    |                                --------------------------------------------- ^^^^^^^^^^ expected struct `rustls::client::client_conn::ClientConfig`, found struct `ClientConfig`
    |                                |
    |                                arguments to this function are incorrect
    |
    = note: struct `ClientConfig` and struct `rustls::client::client_conn::ClientConfig` have similar names, but are actually distinct types
note: struct `ClientConfig` is defined in crate `rustls`
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/rustls-0.21.0/src/client/client_conn.rs:125:1
    |
125 | pub struct ClientConfig {
    | ^^^^^^^^^^^^^^^^^^^^^^^
note: struct `rustls::client::client_conn::ClientConfig` is defined in crate `rustls`
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/rustls-0.20.8/src/client/client_conn.rs:91:1
    |
91  | pub struct ClientConfig {
    | ^^^^^^^^^^^^^^^^^^^^^^^
    = note: perhaps two different versions of crate `rustls` are being used?
note: associated function defined here
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-postgres-rustls-0.9.0/src/lib.rs:23:12
    |
23  |     pub fn new(config: ClientConfig) -> Self {
    |            ^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `postgres_backend` due to previous error
warning: build failed, waiting for other jobs to finish...
```

* aws crates: I could not make new API to work with bucket endpoint
overload, and console e2e tests failed.
Other our tests passed, further investigation is worth to be done in
https://github.com/neondatabase/neon/issues/4008
2023-04-12 15:32:38 +03:00
153 changed files with 3629 additions and 1904 deletions

View File

@@ -4,7 +4,7 @@
hakari-package = "workspace_hack"
# Format for `workspace-hack = ...` lines in other Cargo.tomls. Requires cargo-hakari 0.9.8 or above.
dep-format-version = "3"
dep-format-version = "4"
# Setting workspace.resolver = "2" in the root Cargo.toml is HIGHLY recommended.
# Hakari works much better with the new feature resolver.

View File

@@ -10,6 +10,7 @@
<!-- List everything that should be done **before** release, any issues / setting changes / etc -->
### Checklist after release
- [ ] Make sure instructions from PRs included in this release and labeled `manual_release_instructions` are executed (either by you or by people who wrote them).
- [ ] Based on the merged commits write release notes and open a PR into `website` repo ([example](https://github.com/neondatabase/website/pull/219/files))
- [ ] Check [#dev-production-stream](https://neondb.slack.com/archives/C03F5SM1N02) Slack channel
- [ ] Check [stuck projects page](https://console.neon.tech/admin/projects?sort=last_active&order=desc&stuck=true)

View File

@@ -45,12 +45,12 @@ runs:
shell: bash -euxo pipefail {0}
run: |
if [ "${{ inputs.action }}" != "store" ] && [ "${{ inputs.action }}" != "generate" ]; then
echo 2>&1 "Unknown inputs.action type '${{ inputs.action }}'; allowed 'generate' or 'store' only"
echo >&2 "Unknown inputs.action type '${{ inputs.action }}'; allowed 'generate' or 'store' only"
exit 1
fi
if [ -z "${{ inputs.test_selection }}" ] && [ "${{ inputs.action }}" == "store" ]; then
echo 2>&1 "inputs.test_selection must be set for 'store' action"
echo >&2 "inputs.test_selection must be set for 'store' action"
exit 2
fi

View File

@@ -37,7 +37,7 @@ runs:
echo 'SKIPPED=true' >> $GITHUB_OUTPUT
exit 0
else
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
echo >&2 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
fi

View File

@@ -58,7 +58,7 @@ runs:
done
if [ -z "${branch_id}" ] || [ "${branch_id}" == "null" ]; then
echo 2>&1 "Failed to create branch after 10 attempts, the latest response was: ${branch}"
echo >&2 "Failed to create branch after 10 attempts, the latest response was: ${branch}"
exit 1
fi
@@ -122,7 +122,7 @@ runs:
done
if [ -z "${password}" ] || [ "${password}" == "null" ]; then
echo 2>&1 "Failed to reset password after 10 attempts, the latest response was: ${reset_password}"
echo >&2 "Failed to reset password after 10 attempts, the latest response was: ${reset_password}"
exit 1
fi

View File

@@ -48,7 +48,7 @@ runs:
done
if [ -z "${branch_id}" ] || [ "${branch_id}" == "null" ]; then
echo 2>&1 "Failed to delete branch after 10 attempts, the latest response was: ${deleted_branch}"
echo >&2 "Failed to delete branch after 10 attempts, the latest response was: ${deleted_branch}"
exit 1
fi
env:

View File

@@ -202,7 +202,7 @@ runs:
prefix: latest
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: store

View File

@@ -23,7 +23,7 @@ runs:
mkdir -p $(dirname $ARCHIVE)
if [ -f ${ARCHIVE} ]; then
echo 2>&1 "File ${ARCHIVE} already exist. Something went wrong before"
echo >&2 "File ${ARCHIVE} already exist. Something went wrong before"
exit 1
fi
@@ -33,10 +33,10 @@ runs:
elif [ -f ${SOURCE} ]; then
time tar -cf ${ARCHIVE} --zstd ${SOURCE}
elif ! ls ${SOURCE} > /dev/null 2>&1; then
echo 2>&1 "${SOURCE} does not exist"
echo >&2 "${SOURCE} does not exist"
exit 2
else
echo 2>&1 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
echo >&2 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
exit 3
fi

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
@@ -34,7 +34,7 @@ storage:
pageservers:
hosts:
pageserver-0.us-west-2.aws.neon.tech:
ansible_host: i-0d9f6dfae0e1c780d
ansible_host: i-0d9f6dfae0e1c780d
pageserver-1.us-west-2.aws.neon.tech:
ansible_host: i-0c834be1dddba8b3f
pageserver-2.us-west-2.aws.neon.tech:
@@ -49,5 +49,5 @@ storage:
safekeeper-1.us-west-2.aws.neon.tech:
ansible_host: i-074682f9d3c712e7c
safekeeper-2.us-west-2.aws.neon.tech:
ansible_host: i-042b7efb1729d7966
ansible_host: i-042b7efb1729d7966

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -7,15 +7,16 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -30,7 +30,7 @@ defaults:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:
@@ -42,7 +42,7 @@ jobs:
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -92,7 +92,7 @@ jobs:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
@@ -174,7 +174,7 @@ jobs:
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -226,7 +226,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -282,7 +282,7 @@ jobs:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
@@ -305,7 +305,7 @@ jobs:
#
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
if: success() || failure()
if: ${{ !cancelled() }}
needs: [ generate-matrices, pgbench-compare ]
strategy:
@@ -317,7 +317,7 @@ jobs:
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -356,7 +356,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -379,7 +379,7 @@ jobs:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
@@ -401,7 +401,7 @@ jobs:
# We might change it after https://github.com/neondatabase/neon/issues/2900.
#
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
if: success() || failure()
if: ${{ !cancelled() }}
needs: [ generate-matrices, clickbench-compare ]
strategy:
@@ -413,7 +413,7 @@ jobs:
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -452,7 +452,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_TPCH_S10_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -475,7 +475,7 @@ jobs:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
@@ -491,7 +491,7 @@ jobs:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
user-examples-compare:
if: success() || failure()
if: ${{ !cancelled() }}
needs: [ generate-matrices, tpch-compare ]
strategy:
@@ -503,7 +503,7 @@ jobs:
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
@@ -542,7 +542,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_POSTGRES_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -565,7 +565,7 @@ jobs:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
if: success() || failure()
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate

View File

@@ -13,7 +13,7 @@ defaults:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
@@ -368,7 +368,7 @@ jobs:
build_type: ${{ matrix.build_type }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ github.ref == 'refs/heads/main' }}
save_perf_report: ${{ github.ref_name == 'main' }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -1007,7 +1007,7 @@ jobs:
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
echo 2>&1 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
echo >&2 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi

View File

@@ -12,7 +12,7 @@ defaults:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:

View File

@@ -14,7 +14,7 @@ on:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:

1452
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,10 +24,10 @@ atty = "0.2.14"
aws-config = { version = "0.51.0", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.21.0"
aws-smithy-http = "0.51.0"
aws-types = "0.51.0"
aws-types = "0.55"
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.61"
bindgen = "0.65"
bstr = "1.0"
byteorder = "1.4"
bytes = "1.0"
@@ -50,12 +50,12 @@ git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hex = "0.4"
hex-literal = "0.3"
hex-literal = "0.4"
hmac = "0.12.1"
hostname = "0.3.1"
humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
hyper = { version = "0.14", features = ["http2", "tcp", "runtime", "http1"]}
hyper-tungstenite = "0.9"
itertools = "0.10"
jsonwebtoken = "8"
@@ -80,18 +80,18 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
routerify = "3"
rpds = "0.12.0"
rpds = "0.13"
rustls = "0.20"
rustls-pemfile = "1"
rustls-split = "0.3"
scopeguard = "1.1"
sentry = { version = "0.29", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
sentry = { version = "0.30", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
sha2 = "0.10.2"
signal-hook = "0.3"
socket2 = "0.4.4"
socket2 = "0.5"
strum = "0.24"
strum_macros = "0.24"
svg_fmt = "0.4.1"
@@ -106,27 +106,28 @@ tokio-postgres-rustls = "0.9.0"
tokio-rustls = "0.23"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"] }
toml = "0.5"
toml_edit = { version = "0.17", features = ["easy"] }
tonic = {version = "0.8", features = ["tls", "tls-roots"]}
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2"
uuid = { version = "1.2", features = ["v4", "serde"] }
walkdir = "2.3.2"
webpki-roots = "0.22.5"
x509-parser = "0.14"
webpki-roots = "0.23"
x509-parser = "0.15"
percent-encoding = "1.0"
## TODO replace this with tracing
env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e", features = ["with-chrono-0_4", "array-impls"] }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e", features = ["array-impls"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e", features = ["array-impls"] }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
@@ -154,9 +155,9 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
## Build dependencies
criterion = "0.4"
rcgen = "0.10"
rstest = "0.16"
rstest = "0.17"
tempfile = "3.4"
tonic-build = "0.8"
tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.

View File

@@ -12,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
libicu-dev libxslt1-dev
libicu-dev libxslt1-dev liblz4-dev libzstd-dev
#########################################################################################
#
@@ -24,8 +24,13 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION} postgres
RUN cd postgres && \
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
--with-libxml --with-libxslt && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION}" != "v14" ]; then \
# zstd is available only from PG15
export CONFIGURE_CMD="${CONFIGURE_CMD} --with-zstd"; \
fi && \
eval $CONFIGURE_CMD && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
@@ -60,6 +65,7 @@ RUN apt update && \
# SFCGAL > 1.3 requires CGAL > 5.2, Bullseye's libcgal-dev is 5.2
RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
echo "4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 SFCGAL.tar.gz" | sha256sum --check && \
mkdir sfcgal-src && cd sfcgal-src && tar xvzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
cmake . && make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
@@ -68,6 +74,7 @@ RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postgis.tar.gz && \
echo "9a2a219da005a1730a39d1959a1c7cec619b1efb009b65be80ffc25bad299068 postgis.tar.gz" | sha256sum --check && \
mkdir postgis-src && cd postgis-src && tar xvzf ../postgis.tar.gz --strip-components=1 -C . && \
./autogen.sh && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
@@ -84,6 +91,7 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postg
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer_data_us.control
RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
echo "cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e pgrouting.tar.gz" | sha256sum --check && \
mkdir pgrouting-src && cd pgrouting-src && tar xvzf ../pgrouting.tar.gz --strip-components=1 -C . && \
mkdir build && \
cd build && \
@@ -104,6 +112,7 @@ RUN apt update && \
apt install -y ninja-build python3-dev libncurses5 binutils clang
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.5.tar.gz -O plv8.tar.gz && \
echo "1e108d5df639e4c189e1c5bdfa2432a521c126ca89e7e5a969d46899ca7bf106 plv8.tar.gz" | sha256sum --check && \
mkdir plv8-src && cd plv8-src && tar xvzf ../plv8.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -125,11 +134,13 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN wget https://github.com/Kitware/CMake/releases/download/v3.24.2/cmake-3.24.2-linux-x86_64.sh \
-q -O /tmp/cmake-install.sh \
&& echo "739d372726cb23129d57a539ce1432453448816e345e1545f6127296926b6754 /tmp/cmake-install.sh" | sha256sum --check \
&& chmod u+x /tmp/cmake-install.sh \
&& /tmp/cmake-install.sh --skip-license --prefix=/usr/local/ \
&& rm /tmp/cmake-install.sh
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz && \
echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \
mkdir h3-src && cd h3-src && tar xvzf ../h3.tar.gz --strip-components=1 -C . && \
mkdir build && cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
@@ -139,6 +150,7 @@ RUN wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz
rm -rf build
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.2.tar.gz -O h3-pg.tar.gz && \
echo "c135aa45999b2ad1326d2537c1cadef96d52660838e4ca371706c08fdea1a956 h3-pg.tar.gz" | sha256sum --check && \
mkdir h3-pg-src && cd h3-pg-src && tar xvzf ../h3-pg.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -156,6 +168,7 @@ FROM build-deps AS unit-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -O postgresql-unit.tar.gz && \
echo "411d05beeb97e5a4abf17572bfcfbb5a68d98d1018918feff995f6ee3bb03e79 postgresql-unit.tar.gz" | sha256sum --check && \
mkdir postgresql-unit-src && cd postgresql-unit-src && tar xvzf ../postgresql-unit.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -176,6 +189,7 @@ FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.0.tar.gz -O pgvector.tar.gz && \
echo "b76cf84ddad452cc880a6c8c661d137ddd8679c000a16332f4f03ecf6e10bcc8 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -192,6 +206,7 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# 9742dab1b2f297ad3811120db7b21451bca2d3c9 made on 13/11/2021
RUN wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b21451bca2d3c9.tar.gz -O pgjwt.tar.gz && \
echo "cfdefb15007286f67d3d45510f04a6a7a495004be5b3aecb12cda667e774203f pgjwt.tar.gz" | sha256sum --check && \
mkdir pgjwt-src && cd pgjwt-src && tar xvzf ../pgjwt.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
@@ -206,6 +221,7 @@ FROM build-deps AS hypopg-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypopg.tar.gz && \
echo "e7f01ee0259dc1713f318a108f987663d60f3041948c2ada57a94b469565ca8e hypopg.tar.gz" | sha256sum --check && \
mkdir hypopg-src && cd hypopg-src && tar xvzf ../hypopg.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -221,6 +237,7 @@ FROM build-deps AS pg-hashids-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
echo "74576b992d9277c92196dd8d816baa2cc2d8046fe102f3dcd7f3c3febed6822a pg_hashids.tar.gz" | sha256sum --check && \
mkdir pg_hashids-src && cd pg_hashids-src && tar xvzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -236,6 +253,7 @@ FROM build-deps AS rum-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/postgrespro/rum/archive/refs/tags/1.3.13.tar.gz -O rum.tar.gz && \
echo "6ab370532c965568df6210bd844ac6ba649f53055e48243525b0b7e5c4d69a7d rum.tar.gz" | sha256sum --check && \
mkdir rum-src && cd rum-src && tar xvzf ../rum.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -251,11 +269,28 @@ FROM build-deps AS pgtap-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/theory/pgtap/archive/refs/tags/v1.2.0.tar.gz -O pgtap.tar.gz && \
echo "9c7c3de67ea41638e14f06da5da57bac6f5bd03fea05c165a0ec862205a5c052 pgtap.tar.gz" | sha256sum --check && \
mkdir pgtap-src && cd pgtap-src && tar xvzf ../pgtap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgtap.control
#########################################################################################
#
# Layer "ip4r-pg-build"
# compile ip4r extension
#
#########################################################################################
FROM build-deps AS ip4r-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.1.tar.gz -O ip4r.tar.gz && \
echo "78b9f0c1ae45c22182768fe892a32d533c82281035e10914111400bf6301c726 ip4r.tar.gz" | sha256sum --check && \
mkdir ip4r-src && cd ip4r-src && tar xvzf ../ip4r.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/ip4r.control
#########################################################################################
#
# Layer "prefix-pg-build"
@@ -266,6 +301,7 @@ FROM build-deps AS prefix-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.9.tar.gz -O prefix.tar.gz && \
echo "38d30a08d0241a8bbb8e1eb8f0152b385051665a8e621c8899e7c5068f8b511e prefix.tar.gz" | sha256sum --check && \
mkdir prefix-src && cd prefix-src && tar xvzf ../prefix.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -281,6 +317,7 @@ FROM build-deps AS hll-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.17.tar.gz -O hll.tar.gz && \
echo "9a18288e884f197196b0d29b9f178ba595b0dfc21fbf7a8699380e77fa04c1e9 hll.tar.gz" | sha256sum --check && \
mkdir hll-src && cd hll-src && tar xvzf ../hll.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -296,6 +333,7 @@ FROM build-deps AS plpgsql-check-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.3.2.tar.gz -O plpgsql_check.tar.gz && \
echo "9d81167c4bbeb74eebf7d60147b21961506161addc2aee537f95ad8efeae427b plpgsql_check.tar.gz" | sha256sum --check && \
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -315,6 +353,7 @@ ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN apt-get update && \
apt-get install -y cmake && \
wget https://github.com/timescale/timescaledb/archive/refs/tags/2.10.1.tar.gz -O timescaledb.tar.gz && \
echo "6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73 timescaledb.tar.gz" | sha256sum --check && \
mkdir timescaledb-src && cd timescaledb-src && tar xvzf ../timescaledb.tar.gz --strip-components=1 -C . && \
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON && \
cd build && \
@@ -323,7 +362,39 @@ RUN apt-get update && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/timescaledb.control
#########################################################################################
#
#
# Layer "pg-hint-plan-pg-build"
# compile pg_hint_plan extension
#
#########################################################################################
FROM build-deps AS pg-hint-plan-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN case "${PG_VERSION}" in \
"v14") \
export PG_HINT_PLAN_VERSION=14_1_4_1 \
export PG_HINT_PLAN_CHECKSUM=c3501becf70ead27f70626bce80ea401ceac6a77e2083ee5f3ff1f1444ec1ad1 \
;; \
"v15") \
export PG_HINT_PLAN_VERSION=15_1_5_0 \
export PG_HINT_PLAN_CHECKSUM=564cbbf4820973ffece63fbf76e3c0af62c4ab23543142c7caaa682bc48918be \
;; \
*) \
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \
;; \
esac && \
wget https://github.com/ossc-db/pg_hint_plan/archive/refs/tags/REL${PG_HINT_PLAN_VERSION}.tar.gz -O pg_hint_plan.tar.gz && \
echo "${PG_HINT_PLAN_CHECKSUM} pg_hint_plan.tar.gz" | sha256sum --check && \
mkdir pg_hint_plan-src && cd pg_hint_plan-src && tar xvzf ../pg_hint_plan.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make install -j $(getconf _NPROCESSORS_ONLN) && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_hint_plan.control
#########################################################################################
#
# Layer "rust extensions"
# This layer is used to build `pgx` deps
#
@@ -351,7 +422,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
USER root
#########################################################################################
#
#
# Layer "pg-jsonschema-pg-build"
# Compile "pg_jsonschema" extension
#
@@ -359,15 +430,17 @@ USER root
FROM rust-extensions-build AS pg-jsonschema-pg-build
# there is no release tag yet, but we need it due to the superuser fix in the control file
# caeab60d70b2fd3ae421ec66466a3abbb37b7ee6 made on 06/03/2023
# there is no release tag yet, but we need it due to the superuser fix in the control file, switch to git tag after release >= 0.1.5
RUN wget https://github.com/supabase/pg_jsonschema/archive/caeab60d70b2fd3ae421ec66466a3abbb37b7ee6.tar.gz -O pg_jsonschema.tar.gz && \
echo "54129ce2e7ee7a585648dbb4cef6d73f795d94fe72f248ac01119992518469a4 pg_jsonschema.tar.gz" | sha256sum --check && \
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xvzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
sed -i 's/pgx = "0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
#########################################################################################
#
#
# Layer "pg-graphql-pg-build"
# Compile "pg_graphql" extension
#
@@ -375,11 +448,13 @@ RUN wget https://github.com/supabase/pg_jsonschema/archive/caeab60d70b2fd3ae421e
FROM rust-extensions-build AS pg-graphql-pg-build
# b4988843647450a153439be367168ed09971af85 made on 22/02/2023 (from remove-pgx-contrib-spiext branch)
# Currently pgx version bump to >= 0.7.2 causes "call to unsafe function" compliation errors in
# pgx-contrib-spiext. There is a branch that removes that dependency, so use it. It is on the
# same 1.1 version we've used before.
RUN git clone -b remove-pgx-contrib-spiext --single-branch https://github.com/yrashk/pg_graphql && \
cd pg_graphql && \
RUN wget https://github.com/yrashk/pg_graphql/archive/b4988843647450a153439be367168ed09971af85.tar.gz -O pg_graphql.tar.gz && \
echo "0c7b0e746441b2ec24187d0e03555faf935c2159e2839bddd14df6dafbc8c9bd pg_graphql.tar.gz" | sha256sum --check && \
mkdir pg_graphql-src && cd pg_graphql-src && tar xvzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
sed -i 's/pgx = "~0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgx-tests = "~0.7.1"/pgx-tests = "0.7.3"/g' Cargo.toml && \
cargo pgx install --release && \
@@ -396,8 +471,10 @@ RUN git clone -b remove-pgx-contrib-spiext --single-branch https://github.com/yr
FROM rust-extensions-build AS pg-tiktoken-pg-build
RUN git clone --depth=1 --single-branch https://github.com/kelvich/pg_tiktoken && \
cd pg_tiktoken && \
# 801f84f08c6881c8aa30f405fafbf00eec386a72 made on 10/03/2023
RUN wget https://github.com/kelvich/pg_tiktoken/archive/801f84f08c6881c8aa30f405fafbf00eec386a72.tar.gz -O pg_tiktoken.tar.gz && \
echo "52f60ac800993a49aa8c609961842b611b6b1949717b69ce2ec9117117e16e4a pg_tiktoken.tar.gz" | sha256sum --check && \
mkdir pg_tiktoken-src && cd pg_tiktoken-src && tar xvzf ../pg_tiktoken.tar.gz --strip-components=1 -C . && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_tiktoken.control
@@ -423,10 +500,12 @@ COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=rum-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgtap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=ip4r-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=prefix-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=hll-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plpgsql-check-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hint-plan-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -491,13 +570,17 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)
# liblz4-1 for lz4
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
# libzstd1 for zstd
RUN apt update && \
apt install --no-install-recommends -y \
gdb \
locales \
libicu67 \
liblz4-1 \
libreadline8 \
libossp-uuid16 \
libgeos-c1v5 \
@@ -507,7 +590,8 @@ RUN apt update && \
libsfcgal1 \
libxml2 \
libxslt1.1 \
gdb && \
libzstd1 \
procps && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -147,15 +147,15 @@ Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50
Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one
# start postgres compute node
> ./target/debug/neon_local pg start main
Starting new postgres (v14) main on timeline de200bd42b49cc1814412c7e592dd6e9 ...
> ./target/debug/neon_local endpoint start main
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
Starting postgres at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
# check list of running postgres instances
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
```
2. Now, it is possible to connect to postgres and run some queries:
@@ -184,14 +184,14 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
(L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601]
# start postgres on that branch
> ./target/debug/neon_local pg start migration_check --branch-name migration_check
Starting new postgres migration_check on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
> ./target/debug/neon_local endpoint start migration_check --branch-name migration_check
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Starting postgres node at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
Starting postgres at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
# check the new list of running postgres instances
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16F9A38 running
migration_check 127.0.0.1:55433 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running

View File

@@ -46,6 +46,7 @@ use url::Url;
use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -175,6 +176,8 @@ fn main() -> Result<()> {
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// Start Postgres
let mut delay_exit = false;

View File

@@ -1,12 +1,28 @@
use anyhow::{anyhow, Result};
use postgres::Client;
use tokio_postgres::NoTls;
use tracing::{error, instrument};
use crate::compute::ComputeNode;
/// Update timestamp in a row in a special service table to check
/// that we can actually write some data in this particular timeline.
/// Create table if it's missing.
#[instrument(skip_all)]
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
// Connect to the database.
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
let query = "
CREATE TABLE IF NOT EXISTS health_check (
id serial primary key,
@@ -15,31 +31,15 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = now();";
let result = client.simple_query(query)?;
if result.len() < 2 {
return Err(anyhow::format_err!("executed {} queries", result.len()));
}
Ok(())
}
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
let result = client
.simple_query("UPDATE health_check SET updated_at = now() WHERE id = 1;")
.await?;
if result.len() != 1 {
return Err(anyhow!("statement can't be executed"));
let result = client.simple_query(query).await?;
if result.len() != 2 {
return Err(anyhow::format_err!(
"expected 2 query results, but got {}",
result.len()
));
}
Ok(())
}

View File

@@ -32,7 +32,6 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
use crate::checker::create_writability_check_data;
use crate::config;
use crate::pg_helpers::*;
use crate::spec::*;
@@ -342,7 +341,6 @@ impl ComputeNode {
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(spec, &mut client)?;
// 'Close' connection
@@ -356,6 +354,48 @@ impl ComputeNode {
Ok(())
}
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip(self, client))]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
/// Similar to `apply_config()`, but does a bit different sequence of operations,
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip(self))]
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
// 'Close' connection
drop(client);
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
"finished reconfiguration of compute node for operation {}",
op_id
);
Ok(())
}
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();

View File

@@ -0,0 +1,54 @@
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use tracing::{error, info, instrument};
use compute_api::responses::ComputeStatus;
use crate::compute::ComputeNode;
#[instrument(skip(compute))]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let state = compute.state.lock().unwrap();
let mut state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
state.status = ComputeStatus::Configuration;
compute.state_changed.notify_all();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("compute node configured");
}
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");
break;
} else {
info!("woken up for compute status: {:?}, sleeping", state.status);
}
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
}

View File

@@ -85,7 +85,10 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
let res = crate::checker::check_writability(compute).await;
match res {
Ok(_) => Response::new(Body::from("true")),
Err(e) => Response::new(Body::from(e.to_string())),
Err(e) => {
error!("check_writability failed: {}", e);
Response::new(Body::from(e.to_string()))
}
}
}
@@ -155,7 +158,7 @@ async fn handle_configure_request(
// ```
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty {
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for configuration request: {:?}",
state.status.clone()

View File

@@ -4,6 +4,7 @@
//!
pub mod checker;
pub mod config;
pub mod configurator;
pub mod http;
#[macro_use]
pub mod logger;

View File

@@ -1,7 +1,7 @@
use std::path::Path;
use std::str::FromStr;
use anyhow::Result;
use anyhow::{anyhow, bail, Result};
use postgres::config::Config;
use postgres::{Client, NoTls};
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
@@ -10,6 +10,7 @@ use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::responses::ControlPlaneSpecResponse;
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
@@ -26,13 +27,19 @@ pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<C
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let spec = reqwest::blocking::Client::new()
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?;
.send()
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
.json()
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
Ok(spec)
if let Some(spec) = resp.spec {
Ok(spec)
} else {
bail!("could not get compute spec from control plane")
}
}
/// It takes cluster specification and does the following:

View File

@@ -7,7 +7,7 @@
//!
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::compute::ComputeControlPlane;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -106,8 +106,9 @@ fn main() -> Result<()> {
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"pg" => handle_pg(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env),
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
_ => bail!("unexpected subcommand {sub_name}"),
};
@@ -470,10 +471,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -521,10 +522,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
Ok(())
}
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match pg_match.subcommand() {
Some(pg_subcommand_data) => pg_subcommand_data,
None => bail!("no pg subcommand provided"),
fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match ep_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no endpoint subcommand provided"),
};
let mut cplane = ComputeControlPlane::load(env.clone())?;
@@ -546,7 +547,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
table.load_preset(comfy_table::presets::NOTHING);
table.set_header([
"NODE",
"ENDPOINT",
"ADDRESS",
"TIMELINE",
"BRANCH NAME",
@@ -554,39 +555,39 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
"STATUS",
]);
for ((_, node_name), node) in cplane
.nodes
for (endpoint_id, endpoint) in cplane
.endpoints
.iter()
.filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id)
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
let lsn_str = match node.lsn {
let lsn_str = match endpoint.lsn {
None => {
// -> primary node
// -> primary endpoint
// Use the LSN at the end of the timeline.
timeline_infos
.get(&node.timeline_id)
.get(&endpoint.timeline_id)
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {
// -> read-only node
// Use the node's LSN.
// -> read-only endpoint
// Use the endpoint's LSN.
lsn.to_string()
}
};
let branch_name = timeline_name_mappings
.get(&TenantTimelineId::new(tenant_id, node.timeline_id))
.get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id))
.map(|name| name.as_str())
.unwrap_or("?");
table.add_row([
node_name.as_str(),
&node.address.to_string(),
&node.timeline_id.to_string(),
endpoint_id.as_str(),
&endpoint.address.to_string(),
&endpoint.timeline_id.to_string(),
branch_name,
lsn_str.as_str(),
node.status(),
endpoint.status(),
]);
}
@@ -597,10 +598,10 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.get_one::<String>("branch-name")
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let node_name = sub_args
.get_one::<String>("node")
.map(|node_name| node_name.to_string())
.unwrap_or_else(|| format!("{branch_name}_node"));
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.map(String::to_string)
.unwrap_or_else(|| format!("ep-{branch_name}"));
let lsn = sub_args
.get_one::<String>("lsn")
@@ -618,15 +619,15 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.copied()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
let node_name = sub_args
.get_one::<String>("node")
.ok_or_else(|| anyhow!("No node name was provided to start"))?;
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let node = cplane.nodes.get(&(tenant_id, node_name.to_string()));
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
@@ -636,9 +637,9 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
None
};
if let Some(node) = node {
println!("Starting existing postgres {node_name}...");
node.start(&auth_token)?;
if let Some(endpoint) = endpoint {
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
} else {
let branch_name = sub_args
.get_one::<String>("branch-name")
@@ -663,27 +664,33 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// start --port X
// stop
// start <-- will also use port X even without explicit port argument
println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ...");
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");
let node =
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
node.start(&auth_token)?;
let ep = cplane.new_endpoint(
tenant_id,
endpoint_id,
timeline_id,
lsn,
port,
pg_version,
)?;
ep.start(&auth_token)?;
}
}
"stop" => {
let node_name = sub_args
.get_one::<String>("node")
.ok_or_else(|| anyhow!("No node name was provided to stop"))?;
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
let destroy = sub_args.get_flag("destroy");
let node = cplane
.nodes
.get(&(tenant_id, node_name.to_string()))
.with_context(|| format!("postgres {node_name} is not found"))?;
node.stop(destroy)?;
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(destroy)?;
}
_ => bail!("Unexpected pg subcommand '{sub_name}'"),
_ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
}
Ok(())
@@ -802,7 +809,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
// Postgres nodes are not started automatically
// Endpoints are not started automatically
broker::start_broker_process(env)?;
@@ -836,10 +843,10 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let pageserver = PageServerNode::from_env(env);
// Stop all compute nodes
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.nodes {
for (_k, node) in cplane.endpoints {
if let Err(e) = node.stop(false) {
eprintln!("postgres stop failed: {e:#}");
}
@@ -872,7 +879,9 @@ fn cli() -> Command {
.help("Name of the branch to be created or used as an alias for other services")
.required(false);
let pg_node_arg = Arg::new("node").help("Postgres node name").required(false);
let endpoint_id_arg = Arg::new("endpoint_id")
.help("Postgres endpoint id")
.required(false);
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
@@ -1026,27 +1035,27 @@ fn cli() -> Command {
)
)
.subcommand(
Command::new("pg")
Command::new("endpoint")
.arg_required_else_help(true)
.about("Manage postgres instances")
.subcommand(Command::new("list").arg(tenant_id_arg.clone()))
.subcommand(Command::new("create")
.about("Create a postgres compute node")
.arg(pg_node_arg.clone())
.about("Create a compute endpoint")
.arg(endpoint_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone())
.arg(
Arg::new("config-only")
.help("Don't do basebackup, create compute node with only config files")
.help("Don't do basebackup, create endpoint directory with only config files")
.long("config-only")
.required(false))
.arg(pg_version_arg.clone())
)
.subcommand(Command::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone())
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone())
.arg(branch_name_arg)
.arg(timeline_id_arg)
@@ -1056,7 +1065,7 @@ fn cli() -> Command {
)
.subcommand(
Command::new("stop")
.arg(pg_node_arg)
.arg(endpoint_id_arg)
.arg(tenant_id_arg)
.arg(
Arg::new("destroy")
@@ -1068,6 +1077,13 @@ fn cli() -> Command {
)
)
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
.subcommand(
Command::new("pg")
.hide(true)
.arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false))
.trailing_var_arg(true)
)
.subcommand(
Command::new("start")
.about("Start page server and safekeepers")

View File

@@ -25,54 +25,45 @@ use crate::postgresql_conf::PostgresConf;
//
pub struct ComputeControlPlane {
base_port: u16,
pageserver: Arc<PageServerNode>,
pub nodes: BTreeMap<(TenantId, String), Arc<PostgresNode>>,
// endpoint ID is the key
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
env: LocalEnv,
pageserver: Arc<PageServerNode>,
}
impl ComputeControlPlane {
// Load current nodes with ports from data directories on disk
// Directory structure has the following layout:
// pgdatadirs
// |- tenants
// | |- <tenant_id>
// | | |- <node name>
// Load current endpoints from the endpoints/ subdirectories
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
let pageserver = Arc::new(PageServerNode::from_env(&env));
let mut nodes = BTreeMap::default();
let pgdatadirspath = &env.pg_data_dirs_path();
for tenant_dir in fs::read_dir(pgdatadirspath)
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
let mut endpoints = BTreeMap::default();
for endpoint_dir in fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let tenant_dir = tenant_dir?;
for timeline_dir in fs::read_dir(tenant_dir.path())
.with_context(|| format!("failed to list {}", tenant_dir.path().display()))?
{
let node = PostgresNode::from_dir_entry(timeline_dir?, &env, &pageserver)?;
nodes.insert((node.tenant_id, node.name.clone()), Arc::new(node));
}
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
endpoints.insert(ep.name.clone(), Arc::new(ep));
}
Ok(ComputeControlPlane {
base_port: 55431,
pageserver,
nodes,
endpoints,
env,
pageserver,
})
}
fn get_port(&mut self) -> u16 {
1 + self
.nodes
.endpoints
.values()
.map(|node| node.address.port())
.map(|ep| ep.address.port())
.max()
.unwrap_or(self.base_port)
}
pub fn new_node(
pub fn new_endpoint(
&mut self,
tenant_id: TenantId,
name: &str,
@@ -80,9 +71,9 @@ impl ComputeControlPlane {
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<PostgresNode>> {
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
let node = Arc::new(PostgresNode {
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(),
@@ -93,39 +84,45 @@ impl ComputeControlPlane {
pg_version,
});
node.create_pgdata()?;
node.setup_pg_conf()?;
ep.create_pgdata()?;
ep.setup_pg_conf()?;
self.nodes
.insert((tenant_id, node.name.clone()), Arc::clone(&node));
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
Ok(node)
Ok(ep)
}
}
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct PostgresNode {
pub address: SocketAddr,
pub struct Endpoint {
/// used as the directory name
name: String,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub lsn: Option<Lsn>,
// port and address of the Postgres server
pub address: SocketAddr,
pg_version: u32,
// These are not part of the endpoint as such, but the environment
// the endpoint runs in.
pub env: LocalEnv,
pageserver: Arc<PageServerNode>,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub tenant_id: TenantId,
pg_version: u32,
}
impl PostgresNode {
impl Endpoint {
fn from_dir_entry(
entry: std::fs::DirEntry,
env: &LocalEnv,
pageserver: &Arc<PageServerNode>,
) -> Result<PostgresNode> {
) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() {
anyhow::bail!(
"PostgresNode::from_dir_entry failed: '{}' is not a directory",
"Endpoint::from_dir_entry failed: '{}' is not a directory",
entry.path().display()
);
}
@@ -135,7 +132,7 @@ impl PostgresNode {
let name = fname.to_str().unwrap().to_string();
// Read config file into memory
let cfg_path = entry.path().join("postgresql.conf");
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
let cfg_path_str = cfg_path.to_string_lossy();
let mut conf_file = File::open(&cfg_path)
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
@@ -161,7 +158,7 @@ impl PostgresNode {
conf.parse_field_optional("recovery_target_lsn", &context)?;
// ok now
Ok(PostgresNode {
Ok(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
name,
env: env.clone(),
@@ -269,7 +266,7 @@ impl PostgresNode {
}
// Write postgresql.conf with default configuration
// and PG_VERSION file to the data directory of a new node.
// and PG_VERSION file to the data directory of a new endpoint.
fn setup_pg_conf(&self) -> Result<()> {
let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10");
@@ -289,7 +286,7 @@ impl PostgresNode {
// walproposer panics when basebackup is invalid, it is pointless to restart in this case.
conf.append("restart_after_crash", "off");
// Configure the node to fetch pages from pageserver
// Configure the Neon Postgres extension to fetch pages from pageserver
let pageserver_connstr = {
let config = &self.pageserver.pg_connection_config;
let (host, port) = (config.host(), config.port());
@@ -325,7 +322,7 @@ impl PostgresNode {
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() {
// Configure the node to connect to the safekeepers
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self
@@ -380,8 +377,12 @@ impl PostgresNode {
Ok(())
}
pub fn endpoint_path(&self) -> PathBuf {
self.env.endpoints_path().join(&self.name)
}
pub fn pgdata(&self) -> PathBuf {
self.env.pg_data_dir(&self.tenant_id, &self.name)
self.endpoint_path().join("pgdata")
}
pub fn status(&self) -> &str {
@@ -443,12 +444,11 @@ impl PostgresNode {
}
pub fn start(&self, auth_token: &Option<String>) -> Result<()> {
// Bail if the node already running.
if self.status() == "running" {
anyhow::bail!("The node is already running");
anyhow::bail!("The endpoint is already running");
}
// 1. We always start compute node from scratch, so
// 1. We always start Postgres from scratch, so
// if old dir exists, preserve 'postgresql.conf' and drop the directory
let postgresql_conf_path = self.pgdata().join("postgresql.conf");
let postgresql_conf = fs::read(&postgresql_conf_path).with_context(|| {
@@ -470,8 +470,8 @@ impl PostgresNode {
File::create(self.pgdata().join("standby.signal"))?;
}
// 4. Finally start the compute node postgres
println!("Starting postgres node at '{}'", self.connstr());
// 4. Finally start postgres
println!("Starting postgres at '{}'", self.connstr());
self.pg_ctl(&["start"], auth_token)
}
@@ -480,7 +480,7 @@ impl PostgresNode {
// use immediate shutdown mode, otherwise,
// shutdown gracefully to leave the data directory sane.
//
// Compute node always starts from scratch, so stop
// Postgres is always started from scratch, so stop
// without destroy only used for testing and debugging.
//
if destroy {
@@ -489,7 +489,7 @@ impl PostgresNode {
"Destroying postgres data directory '{}'",
self.pgdata().to_str().unwrap()
);
fs::remove_dir_all(self.pgdata())?;
fs::remove_dir_all(self.endpoint_path())?;
} else {
self.pg_ctl(&["stop"], &None)?;
}

View File

@@ -9,7 +9,7 @@
mod background_process;
pub mod broker;
pub mod compute;
pub mod endpoint;
pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;

View File

@@ -200,14 +200,8 @@ impl LocalEnv {
self.neon_distrib_dir.join("storage_broker")
}
pub fn pg_data_dirs_path(&self) -> PathBuf {
self.base_data_dir.join("pgdatadirs").join("tenants")
}
pub fn pg_data_dir(&self, tenant_id: &TenantId, branch_name: &str) -> PathBuf {
self.pg_data_dirs_path()
.join(tenant_id.to_string())
.join(branch_name)
pub fn endpoints_path(&self) -> PathBuf {
self.base_data_dir.join("endpoints")
}
// TODO: move pageserver files into ./pageserver
@@ -427,7 +421,7 @@ impl LocalEnv {
}
}
fs::create_dir_all(self.pg_data_dirs_path())?;
fs::create_dir_all(self.endpoints_path())?;
for safekeeper in &self.safekeepers {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;

View File

@@ -368,6 +368,9 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -445,6 +448,9 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as an integer")?,
evictions_low_residence_duration_metric_threshold: settings
.get("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
})
.send()?
.error_from_body()?;

View File

@@ -1,7 +1,9 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use chrono::{DateTime, Utc};
use serde::{Serialize, Serializer};
use serde::{Deserialize, Serialize, Serializer};
use crate::spec::ComputeSpec;
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
@@ -43,6 +45,8 @@ pub enum ComputeStatus {
Init,
// Compute is configured and running.
Running,
// New spec is being applied.
Configuration,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
@@ -64,3 +68,11 @@ pub struct ComputeMetrics {
pub config_ms: u64,
pub total_startup_ms: u64,
}
/// Response of the `/computes/{compute_id}/spec` control-plane API.
/// This is not actually a compute API response, so consider moving
/// to a different place.
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {
pub spec: Option<ComputeSpec>,
}

View File

@@ -4,13 +4,12 @@ version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.68"
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
rand = "0.8.3"
serde = "1.0.152"
serde_with = "2.1.0"
utils = { version = "0.1.0", path = "../utils" }
workspace_hack = { version = "0.1.0", path = "../../workspace_hack" }
anyhow.workspace = true
chrono.workspace = true
rand.workspace = true
serde.workspace = true
serde_with.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
const_format.workspace = true
anyhow.workspace = true
bytes.workspace = true
@@ -14,6 +15,7 @@ byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
enum-map.workspace = true
serde_json.workspace = true
strum.workspace = true
strum_macros.workspace = true
workspace_hack.workspace = true

View File

@@ -7,6 +7,7 @@ use std::{
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
@@ -18,11 +19,23 @@ use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(
Clone,
PartialEq,
Eq,
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumString,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
// This tenant is being loaded from local disk
/// This tenant is being loaded from local disk
Loading,
// This tenant is being downloaded from cloud storage.
/// This tenant is being downloaded from cloud storage.
Attaching,
/// Tenant is fully operational
Active,
@@ -31,15 +44,7 @@ pub enum TenantState {
Stopping,
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
Broken,
}
pub mod state {
pub const LOADING: &str = "loading";
pub const ATTACHING: &str = "attaching";
pub const ACTIVE: &str = "active";
pub const STOPPING: &str = "stopping";
pub const BROKEN: &str = "broken";
Broken { reason: String, backtrace: String },
}
impl TenantState {
@@ -49,17 +54,26 @@ impl TenantState {
Self::Attaching => true,
Self::Active => false,
Self::Stopping => false,
Self::Broken => false,
Self::Broken { .. } => false,
}
}
pub fn as_str(&self) -> &'static str {
pub fn broken_from_reason(reason: String) -> Self {
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
Self::Broken {
reason,
backtrace: backtrace_str,
}
}
}
impl std::fmt::Debug for TenantState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TenantState::Loading => state::LOADING,
TenantState::Attaching => state::ATTACHING,
TenantState::Active => state::ACTIVE,
TenantState::Stopping => state::STOPPING,
TenantState::Broken => state::BROKEN,
Self::Broken { reason, backtrace } if !reason.is_empty() => {
write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
}
_ => write!(f, "{self}"),
}
}
}
@@ -121,6 +135,7 @@ pub struct TenantCreateRequest {
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
#[serde_as]
@@ -167,6 +182,7 @@ pub struct TenantConfigRequest {
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
impl TenantConfigRequest {
@@ -188,6 +204,7 @@ impl TenantConfigRequest {
trace_read_requests: None,
eviction_policy: None,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
}
}
}
@@ -615,6 +632,7 @@ impl PagestreamBeMessage {
#[cfg(test)]
mod tests {
use bytes::Buf;
use serde_json::json;
use super::*;
@@ -665,4 +683,57 @@ mod tests {
assert!(msg == reconstructed);
}
}
#[test]
fn test_tenantinfo_serde() {
// Test serialization/deserialization of TenantInfo
let original_active = TenantInfo {
id: TenantId::generate(),
state: TenantState::Active,
current_physical_size: Some(42),
has_in_progress_downloads: Some(false),
};
let expected_active = json!({
"id": original_active.id.to_string(),
"state": {
"slug": "Active",
},
"current_physical_size": 42,
"has_in_progress_downloads": false,
});
let original_broken = TenantInfo {
id: TenantId::generate(),
state: TenantState::Broken {
reason: "reason".into(),
backtrace: "backtrace info".into(),
},
current_physical_size: Some(42),
has_in_progress_downloads: Some(false),
};
let expected_broken = json!({
"id": original_broken.id.to_string(),
"state": {
"slug": "Broken",
"data": {
"backtrace": "backtrace info",
"reason": "reason",
}
},
"current_physical_size": 42,
"has_in_progress_downloads": false,
});
assert_eq!(
serde_json::to_value(&original_active).unwrap(),
expected_active
);
assert_eq!(
serde_json::to_value(&original_broken).unwrap(),
expected_broken
);
assert!(format!("{:?}", &original_broken.state).contains("reason"));
assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
}
}

View File

@@ -5,7 +5,7 @@ use std::path::PathBuf;
use std::process::Command;
use anyhow::{anyhow, Context};
use bindgen::callbacks::ParseCallbacks;
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
#[derive(Debug)]
struct PostgresFfiCallbacks;
@@ -20,7 +20,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
// Add any custom #[derive] attributes to the data structures that bindgen
// creates.
fn add_derives(&self, name: &str) -> Vec<String> {
fn add_derives(&self, derive_info: &DeriveInfo) -> Vec<String> {
// This is the list of data structures that we want to serialize/deserialize.
let serde_list = [
"XLogRecord",
@@ -31,7 +31,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
"ControlFileData",
];
if serde_list.contains(&name) {
if serde_list.contains(&derive_info.name) {
vec![
"Default".into(), // Default allows us to easily fill the padding fields with 0.
"Serialize".into(),

View File

@@ -204,12 +204,7 @@ async fn upload_s3_data(
let data = format!("remote blob data {i}").into_bytes();
let data_len = data.len();
task_client
.upload(
Box::new(std::io::Cursor::new(data)),
data_len,
&blob_path,
None,
)
.upload(std::io::Cursor::new(data), data_len, &blob_path, None)
.await?;
Ok::<_, anyhow::Error>((blob_prefix, blob_path))

View File

@@ -14,4 +14,5 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
workspace_hack.workspace = true

View File

@@ -33,7 +33,7 @@ serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
url.workspace = true
uuid = { version = "1.2", features = ["v4", "serde"] }
uuid.workspace = true
metrics.workspace = true
workspace_hack.workspace = true

View File

@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer)).unwrap();
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
is_incremental: false,
short_id: format!("Layer {}", i),
};
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer)).unwrap();
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -6,6 +6,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde::de::IntoDeserializer;
use std::env;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
@@ -62,7 +63,6 @@ pub mod defaults {
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
///
/// Default built-in configuration file.
@@ -91,7 +91,6 @@ pub mod defaults {
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
@@ -108,6 +107,7 @@ pub mod defaults {
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
# [remote_storage]
@@ -182,9 +182,6 @@ pub struct PageServerConf {
pub metric_collection_endpoint: Option<Url>,
pub synthetic_size_calculation_interval: Duration,
// See the corresponding metric's help string.
pub evictions_low_residence_duration_metric_threshold: Duration,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
pub test_remote_failures: u64,
@@ -257,8 +254,6 @@ struct PageServerConfigBuilder {
metric_collection_endpoint: BuilderValue<Option<Url>>,
synthetic_size_calculation_interval: BuilderValue<Duration>,
evictions_low_residence_duration_metric_threshold: BuilderValue<Duration>,
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
test_remote_failures: BuilderValue<u64>,
@@ -316,11 +311,6 @@ impl Default for PageServerConfigBuilder {
.expect("cannot parse default synthetic size calculation interval")),
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
evictions_low_residence_duration_metric_threshold: Set(humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")),
disk_usage_based_eviction: Set(None),
test_remote_failures: Set(0),
@@ -438,10 +428,6 @@ impl PageServerConfigBuilder {
self.test_remote_failures = BuilderValue::Set(fail_first);
}
pub fn evictions_low_residence_duration_metric_threshold(&mut self, value: Duration) {
self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value);
}
pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
self.disk_usage_based_eviction = BuilderValue::Set(value);
}
@@ -525,11 +511,6 @@ impl PageServerConfigBuilder {
synthetic_size_calculation_interval: self
.synthetic_size_calculation_interval
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.ok_or(anyhow!(
"missing evictions_low_residence_duration_metric_threshold"
))?,
disk_usage_based_eviction: self
.disk_usage_based_eviction
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
@@ -721,12 +702,12 @@ impl PageServerConf {
"synthetic_size_calculation_interval" =>
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
"evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?),
"disk_usage_based_eviction" => {
tracing::info!("disk_usage_based_eviction: {:#?}", &item);
builder.disk_usage_based_eviction(
toml_edit::de::from_item(item.clone())
.context("parse disk_usage_based_eviction")?)
deserialize_from_item("disk_usage_based_eviction", item)
.context("parse disk_usage_based_eviction")?
)
},
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
@@ -827,18 +808,25 @@ impl PageServerConf {
if let Some(eviction_policy) = item.get("eviction_policy") {
t_conf.eviction_policy = Some(
toml_edit::de::from_item(eviction_policy.clone())
deserialize_from_item("eviction_policy", eviction_policy)
.context("parse eviction_policy")?,
);
}
if let Some(item) = item.get("min_resident_size_override") {
t_conf.min_resident_size_override = Some(
toml_edit::de::from_item(item.clone())
deserialize_from_item("min_resident_size_override", item)
.context("parse min_resident_size_override")?,
);
}
if let Some(item) = item.get("evictions_low_residence_duration_metric_threshold") {
t_conf.evictions_low_residence_duration_metric_threshold = Some(parse_toml_duration(
"evictions_low_residence_duration_metric_threshold",
item,
)?);
}
Ok(t_conf)
}
@@ -877,10 +865,6 @@ impl PageServerConf {
cached_metric_collection_interval: Duration::from_secs(60 * 60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: Duration::from_secs(60),
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.unwrap(),
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -938,6 +922,18 @@ where
})
}
fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned,
{
// ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
let deserializer = match item.clone().into_value() {
Ok(value) => value.into_deserializer(),
Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
};
T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
}
/// Configurable semaphore permits setting.
///
/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
@@ -1004,9 +1000,10 @@ mod tests {
use remote_storage::{RemoteStorageKind, S3Config};
use tempfile::{tempdir, TempDir};
use utils::serde_percent::Percent;
use super::*;
use crate::DEFAULT_PG_VERSION;
use crate::{tenant::config::EvictionPolicy, DEFAULT_PG_VERSION};
const ALL_BASE_VALUES_TOML: &str = r#"
# Initial configuration file created by 'pageserver --init'
@@ -1029,8 +1026,6 @@ cached_metric_collection_interval = '22200 s'
metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s'
evictions_low_residence_duration_metric_threshold = '444 s'
log_format = 'json'
"#;
@@ -1087,9 +1082,6 @@ log_format = 'json'
synthetic_size_calculation_interval: humantime::parse_duration(
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
)?,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD
)?,
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -1144,7 +1136,6 @@ log_format = 'json'
cached_metric_collection_interval: Duration::from_secs(22200),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
synthetic_size_calculation_interval: Duration::from_secs(333),
evictions_low_residence_duration_metric_threshold: Duration::from_secs(444),
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -1310,6 +1301,71 @@ trace_read_requests = {trace_read_requests}"#,
Ok(())
}
#[test]
fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let pageserver_conf_toml = format!(
r#"pg_distrib_dir = "{}"
metric_collection_endpoint = "http://sample.url"
metric_collection_interval = "10min"
id = 222
[disk_usage_based_eviction]
max_usage_pct = 80
min_avail_bytes = 0
period = "10s"
[tenant_config]
evictions_low_residence_duration_metric_threshold = "20m"
[tenant_config.eviction_policy]
kind = "LayerAccessThreshold"
period = "20m"
threshold = "20m"
"#,
pg_distrib_dir.display(),
);
let toml: Document = pageserver_conf_toml.parse()?;
let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
assert_eq!(
conf.metric_collection_endpoint,
Some("http://sample.url".parse().unwrap())
);
assert_eq!(
conf.metric_collection_interval,
Duration::from_secs(10 * 60)
);
assert_eq!(
conf.default_tenant_conf
.evictions_low_residence_duration_metric_threshold,
Duration::from_secs(20 * 60)
);
assert_eq!(conf.id, NodeId(222));
assert_eq!(
conf.disk_usage_based_eviction,
Some(DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 0,
period: Duration::from_secs(10),
#[cfg(feature = "testing")]
mock_statvfs: None,
})
);
match &conf.default_tenant_conf.eviction_policy {
EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
}
}
Ok(())
}
fn prepare_fs(tempdir: &TempDir) -> anyhow::Result<(PathBuf, PathBuf)> {
let tempdir_path = tempdir.path();

View File

@@ -829,12 +829,9 @@ components:
type: object
required:
- id
- state
properties:
id:
type: string
state:
type: string
current_physical_size:
type: integer
has_in_progress_downloads:

View File

@@ -465,7 +465,7 @@ async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, A
.iter()
.map(|(id, state)| TenantInfo {
id: *id,
state: *state,
state: state.clone(),
current_physical_size: None,
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
})
@@ -490,7 +490,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
let state = tenant.current_state();
Ok(TenantInfo {
id: tenant_id,
state,
state: state.clone(),
current_physical_size: Some(current_physical_size),
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
})
@@ -781,6 +781,19 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let target_tenant_id = request_data
.new_tenant_id
.map(TenantId::from)
@@ -914,6 +927,19 @@ async fn update_tenant_config_handler(
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
@@ -931,7 +957,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test");
tenant.set_broken("broken from test".to_owned());
json_response(StatusCode::OK, ())
}

View File

@@ -6,7 +6,8 @@ use metrics::{
UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::models::state;
use pageserver_api::models::TenantState;
use strum::VariantNames;
use utils::id::{TenantId, TimelineId};
/// Prometheus histogram buckets (in seconds) for operations in the critical
@@ -147,15 +148,6 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define current logical size metric")
});
// Metrics collected on tenant states.
const TENANT_STATE_OPTIONS: &[&str] = &[
state::LOADING,
state::ATTACHING,
state::ACTIVE,
state::STOPPING,
state::BROKEN,
];
pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_tenant_states_count",
@@ -265,6 +257,22 @@ impl EvictionsWithLowResidenceDuration {
}
}
pub fn change_threshold(
&mut self,
tenant_id: &str,
timeline_id: &str,
new_threshold: Duration,
) {
if new_threshold == self.threshold {
return;
}
let mut with_new =
EvictionsWithLowResidenceDurationBuilder::new(self.data_source, new_threshold)
.build(tenant_id, timeline_id);
std::mem::swap(self, &mut with_new);
with_new.remove(tenant_id, timeline_id);
}
// This could be a `Drop` impl, but, we need the `tenant_id` and `timeline_id`.
fn remove(&mut self, tenant_id: &str, timeline_id: &str) {
let Some(_counter) = self.counter.take() else {
@@ -597,7 +605,7 @@ pub struct TimelineMetrics {
pub num_persistent_files_created: IntCounter,
pub persistent_bytes_written: IntCounter,
pub evictions: IntCounter,
pub evictions_with_low_residence_duration: EvictionsWithLowResidenceDuration,
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
}
impl TimelineMetrics {
@@ -664,7 +672,9 @@ impl TimelineMetrics {
num_persistent_files_created,
persistent_bytes_written,
evictions,
evictions_with_low_residence_duration,
evictions_with_low_residence_duration: std::sync::RwLock::new(
evictions_with_low_residence_duration,
),
}
}
}
@@ -683,6 +693,8 @@ impl Drop for TimelineMetrics {
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
self.evictions_with_low_residence_duration
.write()
.unwrap()
.remove(tenant_id, timeline_id);
for op in STORAGE_TIME_OPERATIONS {
let _ =
@@ -707,7 +719,7 @@ impl Drop for TimelineMetrics {
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
let tid = tenant_id.to_string();
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
for state in TENANT_STATE_OPTIONS {
for state in TenantState::VARIANTS {
let _ = TENANT_STATE_METRIC.remove_label_values(&[&tid, state]);
}
}

View File

@@ -65,7 +65,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = format!("pageserver is shutting down");
let msg = "pageserver is shutting down".to_string();
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}

View File

@@ -267,7 +267,10 @@ impl UninitializedTimeline<'_> {
.await
.context("Failed to flush after basebackup import")?;
self.initialize(ctx)
// Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, false, true)
}
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
@@ -619,7 +622,7 @@ impl Tenant {
match tenant_clone.attach(ctx).await {
Ok(_) => {}
Err(e) => {
tenant_clone.set_broken(&e.to_string());
tenant_clone.set_broken(e.to_string());
error!("error attaching tenant: {:?}", e);
}
}
@@ -827,7 +830,10 @@ impl Tenant {
pub fn create_broken_tenant(conf: &'static PageServerConf, tenant_id: TenantId) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
Arc::new(Tenant::new(
TenantState::Broken,
TenantState::Broken {
reason: "create_broken_tenant".into(),
backtrace: String::new(),
},
conf,
TenantConfOpt::default(),
wal_redo_manager,
@@ -888,7 +894,7 @@ impl Tenant {
match tenant_clone.load(&ctx).await {
Ok(()) => {}
Err(err) => {
tenant_clone.set_broken(&err.to_string());
tenant_clone.set_broken(err.to_string());
error!("could not load tenant {tenant_id}: {err:?}");
}
}
@@ -1440,7 +1446,7 @@ impl Tenant {
}
pub fn current_state(&self) -> TenantState {
*self.state.borrow()
self.state.borrow().clone()
}
pub fn is_active(&self) -> bool {
@@ -1451,15 +1457,15 @@ impl Tenant {
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
let mut result = Ok(());
self.state.send_modify(|current_state| {
match *current_state {
match &*current_state {
TenantState::Active => {
// activate() was called on an already Active tenant. Shouldn't happen.
result = Err(anyhow::anyhow!("Tenant is already active"));
}
TenantState::Broken => {
TenantState::Broken { reason, .. } => {
// This shouldn't happen either
result = Err(anyhow::anyhow!(
"Could not activate tenant because it is in broken state"
"Could not activate tenant because it is in broken state due to: {reason}",
));
}
TenantState::Stopping => {
@@ -1493,7 +1499,10 @@ impl Tenant {
timeline.timeline_id, e
);
timeline.set_state(TimelineState::Broken);
*current_state = TenantState::Broken;
*current_state = TenantState::broken_from_reason(format!(
"failed to activate timeline {}: {}",
timeline.timeline_id, e
));
}
}
}
@@ -1506,7 +1515,7 @@ impl Tenant {
/// Change tenant status to Stopping, to mark that it is being shut down
pub fn set_stopping(&self) {
self.state.send_modify(|current_state| {
match *current_state {
match current_state {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
@@ -1522,8 +1531,8 @@ impl Tenant {
timeline.set_state(TimelineState::Stopping);
}
}
TenantState::Broken => {
info!("Cannot set tenant to Stopping state, it is already in Broken state");
TenantState::Broken { reason, .. } => {
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
@@ -1534,7 +1543,7 @@ impl Tenant {
});
}
pub fn set_broken(&self, reason: &str) {
pub fn set_broken(&self, reason: String) {
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Active => {
@@ -1542,24 +1551,24 @@ impl Tenant {
// while loading or attaching a tenant. A tenant that has already been
// activated should never be marked as broken. We cope with it the best
// we can, but it shouldn't happen.
*current_state = TenantState::Broken;
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
}
TenantState::Broken => {
TenantState::Broken { .. } => {
// This shouldn't happen either
warn!("Tenant is already in Broken state");
}
TenantState::Stopping => {
// This shouldn't happen either
*current_state = TenantState::Broken;
warn!(
"Marking Stopping tenant as Broken state, reason: {}",
reason
);
*current_state = TenantState::broken_from_reason(reason);
}
TenantState::Loading | TenantState::Attaching => {
info!("Setting tenant as Broken state, reason: {}", reason);
*current_state = TenantState::Broken;
*current_state = TenantState::broken_from_reason(reason);
}
}
});
@@ -1572,7 +1581,7 @@ impl Tenant {
pub async fn wait_to_become_active(&self) -> anyhow::Result<()> {
let mut receiver = self.state.subscribe();
loop {
let current_state = *receiver.borrow_and_update();
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching => {
// in these states, there's a chance that we can reach ::Active
@@ -1581,12 +1590,12 @@ impl Tenant {
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken | TenantState::Stopping => {
TenantState::Broken { .. } | TenantState::Stopping => {
// There's no chance the tenant can transition back into ::Active
anyhow::bail!(
"Tenant {} will not become active. Current state: {:?}",
self.tenant_id,
current_state,
&current_state,
);
}
}
@@ -1726,6 +1735,13 @@ impl Tenant {
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
*self.tenant_conf.write().unwrap() = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated();
}
}
fn create_timeline_data(
@@ -1767,21 +1783,23 @@ impl Tenant {
let (state, mut rx) = watch::channel(state);
tokio::spawn(async move {
let current_state = *rx.borrow_and_update();
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
let tid = tenant_id.to_string();
TENANT_STATE_METRIC
.with_label_values(&[&tid, current_state.as_str()])
.with_label_values(&[&tid, current_state])
.inc();
loop {
match rx.changed().await {
Ok(()) => {
let new_state = *rx.borrow();
let new_state: &'static str = From::from(&*rx.borrow_and_update());
TENANT_STATE_METRIC
.with_label_values(&[&tid, current_state.as_str()])
.with_label_values(&[&tid, current_state])
.dec();
TENANT_STATE_METRIC
.with_label_values(&[&tid, new_state.as_str()])
.with_label_values(&[&tid, new_state])
.inc();
current_state = new_state;
}
Err(_sender_dropped_error) => {
info!("Tenant dropped the state updates sender, quitting waiting for tenant state change");
@@ -1876,7 +1894,7 @@ impl Tenant {
.to_string();
// Convert the config to a toml file.
conf_content += &toml_edit::easy::to_string(&tenant_conf)?;
conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
let mut target_config_file = VirtualFile::open_with_options(
target_config_path,
@@ -2308,6 +2326,8 @@ impl Tenant {
)
})?;
// Initialize the timeline without loading the layer map, because we already updated the layer
// map above, when we imported the datadir.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
@@ -2802,6 +2822,9 @@ pub mod harness {
trace_read_requests: Some(tenant_conf.trace_read_requests),
eviction_policy: Some(tenant_conf.eviction_policy),
min_resident_size_override: tenant_conf.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: Some(
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
}
}
}

View File

@@ -39,6 +39,7 @@ pub mod defaults {
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}
/// Per-tenant configuration options
@@ -93,6 +94,9 @@ pub struct TenantConf {
pub trace_read_requests: bool,
pub eviction_policy: EvictionPolicy,
pub min_resident_size_override: Option<u64>,
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -164,6 +168,11 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub min_resident_size_override: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -228,6 +237,9 @@ impl TenantConfOpt {
min_resident_size_override: self
.min_resident_size_override
.or(global_conf.min_resident_size_override),
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
}
}
}
@@ -260,6 +272,10 @@ impl Default for TenantConf {
trace_read_requests: false,
eviction_policy: EvictionPolicy::NoEviction,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
}
}
}
@@ -275,9 +291,9 @@ mod tests {
..TenantConfOpt::default()
};
let toml_form = toml_edit::easy::to_string(&small_conf).unwrap();
let toml_form = toml_edit::ser::to_string(&small_conf).unwrap();
assert_eq!(toml_form, "gc_horizon = 42\n");
assert_eq!(small_conf, toml_edit::easy::from_str(&toml_form).unwrap());
assert_eq!(small_conf, toml_edit::de::from_str(&toml_form).unwrap());
let json_form = serde_json::to_string(&small_conf).unwrap();
assert_eq!(json_form, "{\"gc_horizon\":42}");

View File

@@ -52,7 +52,7 @@ use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Result;
use anyhow::{bail, Result};
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
@@ -126,7 +126,7 @@ where
///
/// Insert an on-disk layer.
///
pub fn insert_historic(&mut self, layer: Arc<L>) {
pub fn insert_historic(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
self.layer_map.insert_historic_noflush(layer)
}
@@ -274,17 +274,22 @@ where
///
/// Helper function for BatchedUpdates::insert_historic
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
Arc::clone(&layer),
);
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
let key = historic_layer_coverage::LayerKey::from(&*layer);
if self.historic.contains(&key) {
bail!(
"Attempt to insert duplicate layer {} in layer map",
layer.short_id()
);
}
self.historic.insert(key, Arc::clone(&layer));
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
}
NUM_ONDISK_LAYERS.inc();
Ok(())
}
///
@@ -838,7 +843,7 @@ mod tests {
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update().insert_historic(remote.clone());
map.batch_update().insert_historic(remote.clone()).unwrap();
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map

View File

@@ -417,6 +417,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
}
pub fn contains(&self, layer_key: &LayerKey) -> bool {
match self.buffer.get(layer_key) {
Some(None) => false, // layer remove was buffered
Some(_) => true, // layer insert was buffered
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
}
}
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
self.buffer.insert(layer_key, Some(value));
}

View File

@@ -537,7 +537,7 @@ where
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Broken
| TenantState::Broken { .. }
| TenantState::Active => tenant.set_stopping(),
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
},
@@ -565,7 +565,7 @@ where
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
tenant.set_broken(&e.to_string());
tenant.set_broken(e.to_string());
}
None => {
warn!("Tenant {tenant_id} got removed from memory");

View File

@@ -74,7 +74,7 @@ pub(super) async fn upload_timeline_layer<'a>(
})?;
storage
.upload(Box::new(source_file), fs_size, &storage_path, None)
.upload(source_file, fs_size, &storage_path, None)
.await
.with_context(|| {
format!(

View File

@@ -209,7 +209,7 @@ async fn wait_for_active_tenant(
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = *tenant_state_updates.borrow();
let new_state = &*tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");

View File

@@ -77,6 +77,7 @@ pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -161,7 +162,7 @@ pub struct Timeline {
ancestor_timeline: Option<Arc<Timeline>>,
ancestor_lsn: Lsn,
metrics: TimelineMetrics,
pub(super) metrics: TimelineMetrics,
/// Ensures layers aren't frozen by checkpointer between
/// [`Timeline::get_layer_for_write`] and layer reads.
@@ -1136,6 +1137,8 @@ impl Timeline {
if let Some(delta) = local_layer_residence_duration {
self.metrics
.evictions_with_low_residence_duration
.read()
.unwrap()
.observe(delta);
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
@@ -1209,6 +1212,35 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
}
fn get_evictions_low_residence_duration_metric_threshold(
tenant_conf: &TenantConfOpt,
default_tenant_conf: &TenantConf,
) -> Duration {
tenant_conf
.evictions_low_residence_duration_metric_threshold
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
// The threshold is embedded in the metric. So, we need to update it.
{
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
&self.tenant_conf.read().unwrap(),
&self.conf.default_tenant_conf,
);
let tenant_id_str = self.tenant_id.to_string();
let timeline_id_str = self.timeline_id.to_string();
self.metrics
.evictions_with_low_residence_duration
.write()
.unwrap()
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
}
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.
@@ -1240,6 +1272,11 @@ impl Timeline {
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
let evictions_low_residence_duration_metric_threshold =
Self::get_evictions_low_residence_duration_metric_threshold(
&tenant_conf_guard,
&conf.default_tenant_conf,
);
drop(tenant_conf_guard);
Arc::new_cyclic(|myself| {
@@ -1287,7 +1324,7 @@ impl Timeline {
&timeline_id,
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
"mtime",
conf.evictions_low_residence_duration_metric_threshold,
evictions_low_residence_duration_metric_threshold,
),
),
@@ -1446,7 +1483,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer))?;
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1478,7 +1515,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer))?;
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1552,7 +1589,7 @@ impl Timeline {
// remote index file?
// If so, rename_to_backup those files & replace their local layer with
// a RemoteLayer in the layer map so that we re-download them on-demand.
if let Some(local_layer) = local_layer {
if let Some(local_layer) = &local_layer {
let local_layer_path = local_layer
.local_path()
.expect("caller must ensure that local_layers only contains local layers");
@@ -1577,7 +1614,6 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer);
// fall-through to adding the remote layer
}
} else {
@@ -1613,7 +1649,11 @@ impl Timeline {
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1637,7 +1677,11 @@ impl Timeline {
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
}
}
}
@@ -2684,7 +2728,7 @@ impl Timeline {
.write()
.unwrap()
.batch_update()
.insert_historic(Arc::new(new_delta));
.insert_historic(Arc::new(new_delta))?;
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
@@ -2889,7 +2933,7 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
updates.insert_historic(Arc::new(l));
updates.insert_historic(Arc::new(l))?;
}
updates.flush();
drop(layers);
@@ -3322,7 +3366,7 @@ impl Timeline {
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
updates.insert_historic(x);
updates.insert_historic(x)?;
}
// Now that we have reshuffled the data to set of new delta layers, we can

View File

@@ -23,7 +23,7 @@ hmac.workspace = true
hostname.workspace = true
humantime.workspace = true
hyper-tungstenite.workspace = true
hyper.workspace = true
hyper = { workspace = true, features = ["http2", "http1", "tcp", "runtime"] }
itertools.workspace = true
md5.workspace = true
metrics.workspace = true
@@ -64,6 +64,8 @@ webpki-roots.workspace = true
x509-parser.workspace = true
workspace_hack.workspace = true
tokio-util.workspace = true
percent-encoding.workspace = true
[dev-dependencies]
rcgen.workspace = true

View File

@@ -40,7 +40,7 @@ pub fn configure_tls(
let mut cert_resolver = CertResolver::new();
// add default certificate
cert_resolver.add_cert(key_path, cert_path)?;
cert_resolver.add_cert(key_path, cert_path, true)?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
@@ -52,8 +52,11 @@ pub fn configure_tls(
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver
.add_cert(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
cert_resolver.add_cert(
&key_path.to_string_lossy(),
&cert_path.to_string_lossy(),
false,
)?;
}
}
}
@@ -78,16 +81,23 @@ pub fn configure_tls(
struct CertResolver {
certs: HashMap<String, Arc<rustls::sign::CertifiedKey>>,
default: Option<Arc<rustls::sign::CertifiedKey>>,
}
impl CertResolver {
fn new() -> Self {
Self {
certs: HashMap::new(),
default: None,
}
}
fn add_cert(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
fn add_cert(
&mut self,
key_path: &str,
cert_path: &str,
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
@@ -136,10 +146,13 @@ impl CertResolver {
"Failed to parse common name from certificate at '{cert_path}'."
))?;
self.certs.insert(
common_name,
Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key)),
);
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
if is_default {
self.default = Some(cert.clone());
}
self.certs.insert(common_name, cert);
Ok(())
}
@@ -172,7 +185,17 @@ impl rustls::server::ResolvesServerCert for CertResolver {
}
}
} else {
None
// No SNI, use the default certificate, otherwise we can't get to
// options parameter which can be used to set endpoint name too.
// That means that non-SNI flow will not work for CNAME domains in
// verify-full mode.
//
// If that will be a problem we can:
//
// a) Instead of multi-cert approach use single cert with extra
// domains listed in Subject Alternative Name (SAN).
// b) Deploy separate proxy instances for extra domains.
self.default.as_ref().cloned()
}
}
}

View File

@@ -2,6 +2,7 @@
//! Other modules should use stuff from this module instead of
//! directly relying on deps like `reqwest` (think loose coupling).
pub mod pg_to_json;
pub mod server;
pub mod websocket;

View File

@@ -0,0 +1,154 @@
use anyhow::{anyhow, Context};
use chrono::{Utc, DateTime, NaiveDateTime, NaiveTime, NaiveDate};
use serde_json::Map;
use tokio_postgres::{
types::{FromSql, Type},
Column, Row,
};
// some type-aliases I use in my project
pub type JSONValue = serde_json::Value;
pub type RowData = Map<String, JSONValue>;
pub type Error = anyhow::Error; // from: https://github.com/dtolnay/anyhow
pub fn postgres_row_to_json_value(row: &Row) -> Result<JSONValue, Error> {
let row_data = postgres_row_to_row_data(row)?;
Ok(JSONValue::Object(row_data))
}
pub fn postgres_row_to_row_data(row: &Row) -> Result<RowData, Error> {
let mut result: Map<String, JSONValue> = Map::new();
for (i, column) in row.columns().iter().enumerate() {
let name = column.name();
let json_value = pg_cell_to_json_value(&row, column, i)?;
result.insert(name.to_string(), json_value);
}
Ok(result)
}
pub fn pg_cell_to_json_value(
row: &Row,
column: &Column,
column_i: usize,
) -> Result<JSONValue, Error> {
let f64_to_json_number = |raw_val: f64| -> Result<JSONValue, Error> {
let temp = serde_json::Number::from_f64(raw_val).ok_or(anyhow!("invalid json-float"))?;
Ok(JSONValue::Number(temp))
};
Ok(match *column.type_() {
// for rust-postgres <> postgres type-mappings: https://docs.rs/postgres/latest/postgres/types/trait.FromSql.html#types
// for postgres types: https://www.postgresql.org/docs/7.4/datatype.html#DATATYPE-TABLE
Type::TIMESTAMPTZ => get_basic(row, column, column_i, |a: DateTime<Utc>| {
Ok(JSONValue::String(a.to_rfc3339()))
})?,
Type::TIMESTAMP => get_basic(row, column, column_i, |a: NaiveDateTime| {
Ok(JSONValue::String(a.format("%Y-%m-%dT%H:%M:%S%.6f").to_string()))
})?,
Type::TIME => get_basic(row, column, column_i, |a: NaiveTime| {
Ok(JSONValue::String(a.format("%H:%M:%S%.6f").to_string()))
})?,
Type::DATE => get_basic(row, column, column_i, |a: NaiveDate| {
Ok(JSONValue::String(a.format("%Y-%m-%d").to_string()))
})?,
// no TIMETZ support?
// single types
Type::BOOL => get_basic(row, column, column_i, |a: bool| Ok(JSONValue::Bool(a)))?,
Type::INT2 => get_basic(row, column, column_i, |a: i16| {
Ok(JSONValue::Number(serde_json::Number::from(a)))
})?,
Type::INT4 => get_basic(row, column, column_i, |a: i32| {
Ok(JSONValue::Number(serde_json::Number::from(a)))
})?,
Type::INT8 => get_basic(row, column, column_i, |a: i64| {
Ok(JSONValue::Number(serde_json::Number::from(a)))
})?,
Type::TEXT | Type::VARCHAR => {
get_basic(row, column, column_i, |a: String| Ok(JSONValue::String(a)))?
}
// Type::JSON | Type::JSONB => get_basic(row, column, column_i, |a: JSONValue| Ok(a))?,
Type::FLOAT4 => get_basic(row, column, column_i, |a: f32| f64_to_json_number(a.into()))?,
Type::FLOAT8 => get_basic(row, column, column_i, f64_to_json_number)?,
// these types require a custom StringCollector struct as an intermediary (see struct at bottom)
Type::TS_VECTOR => get_basic(row, column, column_i, |a: StringCollector| {
Ok(JSONValue::String(a.0))
})?,
// array types
Type::BOOL_ARRAY => get_array(row, column, column_i, |a: bool| Ok(JSONValue::Bool(a)))?,
Type::INT2_ARRAY => get_array(row, column, column_i, |a: i16| {
Ok(JSONValue::Number(serde_json::Number::from(a)))
})?,
Type::INT4_ARRAY => get_array(row, column, column_i, |a: i32| {
Ok(JSONValue::Number(serde_json::Number::from(a)))
})?,
Type::INT8_ARRAY => get_array(row, column, column_i, |a: i64| {
Ok(JSONValue::Number(serde_json::Number::from(a)))
})?,
Type::TEXT_ARRAY | Type::VARCHAR_ARRAY => {
get_array(row, column, column_i, |a: String| Ok(JSONValue::String(a)))?
}
// Type::JSON_ARRAY | Type::JSONB_ARRAY => get_array(row, column, column_i, |a: JSONValue| Ok(a))?,
Type::FLOAT4_ARRAY => get_array(row, column, column_i, f64_to_json_number)?,
Type::FLOAT8_ARRAY => get_array(row, column, column_i, f64_to_json_number)?,
// these types require a custom StringCollector struct as an intermediary (see struct at bottom)
Type::TS_VECTOR_ARRAY => get_array(row, column, column_i, |a: StringCollector| {
Ok(JSONValue::String(a.0))
})?,
_ => anyhow::bail!(
"Cannot convert pg-cell \"{}\" of type \"{}\" to a JSONValue.",
column.name(),
column.type_().name()
),
})
}
fn get_basic<'a, T: FromSql<'a>>(
row: &'a Row,
column: &Column,
column_i: usize,
val_to_json_val: impl Fn(T) -> Result<JSONValue, Error>,
) -> Result<JSONValue, Error> {
let raw_val = row
.try_get::<_, Option<T>>(column_i)
.with_context(|| format!("column_name:{}", column.name()))?;
raw_val.map_or(Ok(JSONValue::Null), val_to_json_val)
}
fn get_array<'a, T: FromSql<'a>>(
row: &'a Row,
column: &Column,
column_i: usize,
val_to_json_val: impl Fn(T) -> Result<JSONValue, Error>,
) -> Result<JSONValue, Error> {
let raw_val_array = row
.try_get::<_, Option<Vec<T>>>(column_i)
.with_context(|| format!("column_name:{}", column.name()))?;
Ok(match raw_val_array {
Some(val_array) => {
let mut result = vec![];
for val in val_array {
result.push(val_to_json_val(val)?);
}
JSONValue::Array(result)
}
None => JSONValue::Null,
})
}
struct StringCollector(String);
impl FromSql<'_> for StringCollector {
fn from_sql(
_: &Type,
raw: &[u8],
) -> Result<StringCollector, Box<dyn std::error::Error + Sync + Send>> {
let result = std::str::from_utf8(raw)?;
Ok(StringCollector(result.to_owned()))
}
fn accepts(_ty: &Type) -> bool {
true
}
}

View File

@@ -1,15 +1,25 @@
use crate::http::pg_to_json::postgres_row_to_json_value;
use crate::{
cancellation::CancelMap, config::ProxyConfig, error::io_error, proxy::handle_ws_client,
auth, cancellation::CancelMap, config::ProxyConfig, console, error::io_error,
proxy::handle_ws_client,
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
use futures::{Sink, Stream, StreamExt, TryStreamExt};
use tokio_postgres::Row;
use std::collections::HashMap;
use hyper::{
server::{accept, conn::AddrIncoming},
upgrade::Upgraded,
Body, Request, Response, StatusCode,
Body, Method, Request, Response, StatusCode,
};
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
use percent_encoding::percent_decode;
use pin_project_lite::pin_project;
use pq_proto::StartupMessageParams;
use serde_json::{Value, json};
use tokio::sync::Mutex;
use tokio_postgres::types::{ToSql};
use std::{
convert::Infallible,
future::ready,
@@ -22,7 +32,9 @@ use tokio::{
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
net::TcpListener,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use url::{Url};
use utils::http::{error::ApiError, json::json_response};
// TODO: use `std::sync::Exclusive` once it's stabilized.
@@ -40,10 +52,10 @@ pin_project! {
}
impl WebSocketRw {
pub fn new(stream: WebSocketStream<Upgraded>) -> Self {
pub fn new(stream: WebSocketStream<Upgraded>, startup_data: Bytes) -> Self {
Self {
stream: stream.into(),
bytes: Bytes::new(),
bytes: startup_data,
}
}
}
@@ -80,6 +92,7 @@ impl AsyncRead for WebSocketRw {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if buf.remaining() > 0 {
let bytes = ready!(self.as_mut().poll_fill_buf(cx))?;
let len = std::cmp::min(bytes.len(), buf.remaining());
@@ -140,25 +153,101 @@ async fn serve_websocket(
cancel_map: &CancelMap,
session_id: uuid::Uuid,
hostname: Option<String>,
startup_data: Vec<u8>,
) -> anyhow::Result<()> {
let websocket = websocket.await?;
handle_ws_client(
config,
cancel_map,
session_id,
WebSocketRw::new(websocket),
WebSocketRw::new(websocket, startup_data.into()),
hostname,
)
.await?;
Ok(())
}
struct MyObject {
data: Vec<u8>,
recv_data: Vec<u8>,
}
impl MyObject {
fn new(data: Vec<u8>) -> Self {
MyObject {
data,
recv_data: Vec::with_capacity(512),
}
}
}
impl AsyncRead for MyObject {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), io::Error>> {
let data = &self.get_mut().data;
// let len: usize = recv_data.len();
// info!("len: {}", len);
// let mut i: usize = 0;
// let mut zcount = 0;
// while len >= i + 5 {
// let cmd = recv_data[i];
// if cmd == 0x5a { zcount += 1; }
// let size = u32::from_be_bytes(recv_data[(i + 1)..(i + 5)].try_into().unwrap());
// info!("cmd: {} size: {} buf: {:?}", cmd, size, recv_data);
// i += usize::try_from(size).unwrap() + 1;
// }
// if zcount < 2 {
// Poll::Pending
// } else {
// let mut reader = &recv_data[..];
// Pin::new(&mut reader).poll_read(cx, buf)
// }
let mut reader = &data[..];
Pin::new(&mut reader).poll_read(cx, buf)
}
}
impl AsyncWrite for MyObject {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
// eprintln!("{:?}", buf);
let recv_data = &mut self.get_mut().recv_data;
recv_data.extend(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// ...
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// ...
Poll::Ready(Ok(()))
}
}
async fn ws_handler(
mut request: Request<Body>,
config: &'static ProxyConfig,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
cache: Arc<Mutex<ConnectionCache>>,
) -> Result<Response<Body>, ApiError> {
let host = request
.headers()
.get("host")
@@ -168,26 +257,313 @@ async fn ws_handler(
// Check if the request is a websocket upgrade request.
if hyper_tungstenite::is_upgrade_request(&request) {
let startup_data = match request.uri().query() {
Some(b64_str) => match base64::decode_config(b64_str, base64::URL_SAFE) {
Ok(x) => x,
Err(_) => {
eprintln!("invalid WebSocket base64 startup data");
vec![]
}
},
None => vec![],
};
info!("{} bytes of startup data received via WebSocket URL query", startup_data.len());
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
.map_err(|e| ApiError::BadRequest(e.into()))?;
tokio::spawn(async move {
if let Err(e) = serve_websocket(websocket, config, &cancel_map, session_id, host).await
{
if let Err(e) = serve_websocket(websocket, config, &cancel_map, session_id, host, startup_data).await {
error!("error in websocket connection: {e:?}");
}
});
// Return the response so the spawned future can continue.
Ok(response)
} else if request.uri().path() == "/pg-protocol" && request.method() == Method::POST {
let mut body = request.into_body();
let mut data = Vec::with_capacity(512);
while let Some(chunk) = body.next().await {
data.extend(&chunk.map_err(|e| ApiError::InternalServerError(e.into()))?);
}
let mut my_object = MyObject::new(data);
let my_object = tokio::spawn(async move {
let result = handle_ws_client(
config,
&cancel_map,
session_id,
&mut my_object,
host,
).await;
my_object
})
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let response = Response::builder()
.header("Content-Type", "application/octet-stream")
.header("Access-Control-Allow-Origin", "*")
.status(StatusCode::OK)
.body(Body::from(my_object.recv_data))
.map_err(|e| ApiError::InternalServerError(e.into()))?;
Ok(response)
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let result = handle_sql(config, request).await;
let status_code = match result {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST
};
let json = match result {
Ok(r) => serde_json::to_value(r).unwrap(),
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null
},
None => Value::Null
};
json!({ "message": message, "code": code })
}
};
json_response(status_code, json).map(|mut r| {
r.headers_mut().insert(
"Access-Control-Allow-Origin",
hyper::http::HeaderValue::from_static("*"),
);
r
})
} else if request.uri().path() == "/sleep" {
// sleep 15ms
tokio::time::sleep(std::time::Duration::from_millis(15)).await;
json_response(StatusCode::OK, "done")
} else {
json_response(StatusCode::OK, "Connect with a websocket client")
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}
}
fn boxed_from_json(json: Vec<Value>) -> Result<Vec<Box<dyn ToSql + Sync + Send>>, anyhow::Error> {
json.iter().map(|value| {
let boxed: Result<Box<dyn ToSql + Sync + Send>, anyhow::Error> = match value {
Value::Bool(b) => Ok(Box::new(b.clone())),
Value::Number(n) => Ok(Box::new(n.as_f64().unwrap())),
Value::String(s) => Ok(Box::new(s.clone())),
// TODO: support null (not like this: `Value::Null => Ok(Box::new(None::<bool>)),`)
// TODO: support arrays
x => Err(anyhow::anyhow!("unsupported param {:?}", x))
};
boxed
})
.collect::<Result<Vec<_>, anyhow::Error>>()
}
// XXX: return different error codes
async fn handle_sql(
config: &'static ProxyConfig,
request: Request<Body>
) -> anyhow::Result<Vec<Value>> {
let headers = request.headers();
let connection_string = headers
.get("X-Neon-ConnectionString")
.ok_or(anyhow::anyhow!("missing connection string"))?
.to_str()?;
let connection_url = Url::parse(connection_string)?;
let protocol = connection_url.scheme();
if protocol != "postgres" && protocol != "postgresql" {
return Err(anyhow::anyhow!("connection string must start with postgres: or postgresql:"))
}
let mut url_path = connection_url
.path_segments()
.ok_or(anyhow::anyhow!("missing database name"))?;
let dbname = url_path
.next()
.ok_or(anyhow::anyhow!("invalid database name"))?;
let username = connection_url.username();
let password = connection_url
.password()
.ok_or(anyhow::anyhow!("no password"))?;
let hostname = connection_url
.host_str()
.ok_or(anyhow::anyhow!("no host"))?;
let host_header = request
.headers()
.get("host")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split(':').next());
match host_header {
Some(h) if h == hostname => h,
Some(_) => return Err(anyhow::anyhow!("mismatched host header and hostname")),
None => return Err(anyhow::anyhow!("no host header"))
};
let mut body = request.into_body();
let mut data = Vec::with_capacity(512);
while let Some(chunk) = body.next().await {
data.extend(&chunk?);
}
#[derive(serde::Deserialize)]
struct QueryData {
query: String,
params: Vec<serde_json::Value>
}
let query_data: QueryData = serde_json::from_slice(&data)?;
let credential_params = StartupMessageParams::new([
("user", username),
("database", dbname),
("application_name", "proxy_http_sql"),
]);
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let creds = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(hostname), common_names))
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("proxy_http_sql"),
};
let node = creds.wake_compute(&extra).await?.expect("msg");
let conf = node.value.config;
let host = match conf.get_hosts().first().expect("no host") {
tokio_postgres::config::Host::Tcp(host) => host,
tokio_postgres::config::Host::Unix(_) => {
return Err(anyhow::anyhow!("unix socket is not supported"));
}
};
let conn_string = &format!(
"host={} port={} user={} password={} dbname={}",
host,
conf.get_ports().first().expect("no port"),
username,
password,
dbname
);
let (client, connection) = tokio_postgres::connect(conn_string, tokio_postgres::NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let query = &query_data.query;
let query_params = boxed_from_json(query_data.params)?;
// TODO: find a way to catch the panic and return an error if the number of params is wrong
let pg_rows: Vec<Row> = client
.query_raw(query, query_params)
.await?
.try_collect::<Vec<Row>>()
.await?;
let rows: Result<Vec<serde_json::Value>, anyhow::Error> = pg_rows
.iter()
.map(postgres_row_to_json_value)
.collect();
let rows = rows?;
Ok(rows)
}
pub struct ConnectionCache {
connections: HashMap<String, tokio_postgres::Client>,
}
impl ConnectionCache {
pub fn new() -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self {
connections: HashMap::new(),
}))
}
pub async fn execute(
cache: &Arc<Mutex<ConnectionCache>>,
conn_string: &str,
hostname: &str,
sql: &str,
) -> anyhow::Result<String> {
// TODO: let go mutex when establishing connection
let mut cache = cache.lock().await;
let cache_key = format!("connstr={}, hostname={}", conn_string, hostname);
let client = if let Some(client) = cache.connections.get(&cache_key) {
info!("using cached connection {}", conn_string);
client
} else {
info!("!!!! connecting to: {}", conn_string);
let (client, connection) =
tokio_postgres::connect(conn_string, tokio_postgres::NoTls).await?;
tokio::spawn(async move {
// TODO: remove connection from cache
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
cache.connections.insert(cache_key.clone(), client);
cache.connections.get(&cache_key).unwrap()
};
let sql = percent_decode(sql.as_bytes()).decode_utf8()?.to_string();
let rows: Vec<HashMap<_, _>> = client
.simple_query(&sql)
.await?
.into_iter()
.filter_map(|el| {
if let tokio_postgres::SimpleQueryMessage::Row(row) = el {
let mut serialized_row: HashMap<String, String> = HashMap::new();
for i in 0..row.len() {
let col = row.columns().get(i).map_or("?", |c| c.name());
let val = row.get(i).unwrap_or("?");
serialized_row.insert(col.into(), val.into());
}
Some(serialized_row)
} else {
None
}
})
.collect();
Ok(serde_json::to_string(&rows)?)
}
}
pub async fn task_main(
config: &'static ProxyConfig,
cache: &'static Arc<Mutex<ConnectionCache>>,
ws_listener: TcpListener,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("websocket server has shut down");
@@ -219,7 +595,7 @@ pub async fn task_main(
move |req: Request<Body>| async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, cancel_map, session_id)
ws_handler(req, config, cancel_map, session_id, cache.clone())
.instrument(info_span!(
"ws-client",
session = format_args!("{session_id}")
@@ -231,6 +607,7 @@ pub async fn task_main(
hyper::Server::builder(accept::from_stream(tls_listener))
.serve(make_svc)
.with_graceful_shutdown(cancellation_token.cancelled())
.await?;
Ok(())

View File

@@ -28,9 +28,12 @@ use config::ProxyConfig;
use futures::FutureExt;
use std::{borrow::Cow, future::Future, net::SocketAddr};
use tokio::{net::TcpListener, task::JoinError};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use utils::{project_git_version, sentry_init::init_sentry};
use crate::http::websocket::ConnectionCache;
project_git_version!(GIT_VERSION);
/// Flattens `Result<Result<T>>` into `Result<T>`.
@@ -52,6 +55,8 @@ async fn main() -> anyhow::Result<()> {
let args = cli().get_matches();
let config = build_config(&args)?;
let wsconn_cache = Box::leak(Box::new(ConnectionCache::new()));
info!("Authentication backend: {}", config.auth_backend);
// Check that we can bind to address before further initialization
@@ -66,39 +71,49 @@ async fn main() -> anyhow::Result<()> {
let proxy_address: SocketAddr = args.get_one::<String>("proxy").unwrap().parse()?;
info!("Starting proxy on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
let mut tasks = vec![
tokio::spawn(handle_signals()),
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(proxy::task_main(config, proxy_listener)),
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
];
let mut client_tasks = vec![tokio::spawn(proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
))];
if let Some(wss_address) = args.get_one::<String>("wss") {
let wss_address: SocketAddr = wss_address.parse()?;
info!("Starting wss on {wss_address}");
let wss_listener = TcpListener::bind(wss_address).await?;
tasks.push(tokio::spawn(http::websocket::task_main(
client_tasks.push(tokio::spawn(http::websocket::task_main(
config,
wsconn_cache,
wss_listener,
cancellation_token.clone(),
)));
}
let mut tasks = vec![
tokio::spawn(handle_signals(cancellation_token)),
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
];
if let Some(metrics_config) = &config.metric_collection {
tasks.push(tokio::spawn(metrics::task_main(metrics_config)));
}
// This combinator will block until either all tasks complete or
// one of them finishes with an error (others will be cancelled).
let tasks = tasks.into_iter().map(flatten_err);
let _: Vec<()> = futures::future::try_join_all(tasks).await?;
let tasks = futures::future::try_join_all(tasks.into_iter().map(flatten_err));
let client_tasks = futures::future::try_join_all(client_tasks.into_iter().map(flatten_err));
tokio::select! {
// We are only expecting an error from these forever tasks
res = tasks => { res?; },
res = client_tasks => { res?; },
}
Ok(())
}
/// Handle unix signals appropriately.
async fn handle_signals() -> anyhow::Result<()> {
async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut hangup = signal(SignalKind::hangup())?;
@@ -116,11 +131,9 @@ async fn handle_signals() -> anyhow::Result<()> {
warn!("received SIGINT, exiting immediately");
bail!("interrupted");
}
// TODO: Don't accept new proxy connections.
// TODO: Shut down once all exisiting connections have been closed.
_ = terminate.recv() => {
warn!("received SIGTERM, exiting immediately");
bail!("terminated");
warn!("received SIGTERM, shutting down once all existing connections have closed");
token.cancel();
}
}
}

View File

@@ -17,6 +17,7 @@ use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use utils::measured_stream::MeasuredStream;
@@ -63,6 +64,7 @@ static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
@@ -72,29 +74,48 @@ pub async fn task_main(
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let mut connections = tokio::task::JoinSet::new();
let cancel_map = Arc::new(CancelMap::default());
loop {
let (socket, peer_addr) = listener.accept().await?;
info!("accepted postgres client connection from {peer_addr}");
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
info!("accepted postgres client connection from {peer_addr}");
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
tokio::spawn(
async move {
info!("spawned a task for {peer_addr}");
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
connections.spawn(
async move {
info!("spawned a task for {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set socket option")?;
socket
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(config, &cancel_map, session_id, socket).await
handle_client(config, &cancel_map, session_id, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
}),
);
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
}),
);
_ = cancellation_token.cancelled() => {
drop(listener);
break;
}
}
}
// Drain connections
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
Ok(())
}
// TODO(tech debt): unite this with its twin below.

View File

@@ -23,7 +23,6 @@ use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
@@ -374,7 +373,7 @@ impl BrokerService for Broker {
Ok(info) => yield info,
Err(RecvError::Lagged(skipped_msg)) => {
missed_msgs += skipped_msg;
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
missed_msgs = 0;

View File

@@ -114,7 +114,7 @@ class NeonCompare(PgCompare):
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
# Start pg
self._pg = self.env.postgres.create_start(branch_name, "main", self.tenant)
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
@property
def pg(self) -> PgProtocol:

View File

@@ -830,7 +830,7 @@ class NeonEnvBuilder:
# Stop all the nodes.
if self.env:
log.info("Cleaning up all storage and compute nodes")
self.env.postgres.stop_all()
self.env.endpoints.stop_all()
for sk in self.env.safekeepers:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
@@ -894,7 +894,7 @@ class NeonEnv:
self.port_distributor = config.port_distributor
self.s3_mock_server = config.mock_s3_server
self.neon_cli = NeonCli(env=self)
self.postgres = PostgresFactory(self)
self.endpoints = EndpointFactory(self)
self.safekeepers: List[Safekeeper] = []
self.broker = config.broker
self.remote_storage = config.remote_storage
@@ -902,6 +902,7 @@ class NeonEnv:
self.pg_version = config.pg_version
self.neon_binpath = config.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -1015,6 +1016,13 @@ class NeonEnv:
priv = (Path(self.repo_dir) / "auth_private_key.pem").read_text()
return AuthKeys(pub=pub, priv=priv)
def generate_endpoint_id(self) -> str:
"""
Generate a unique endpoint ID
"""
self.endpoint_counter += 1
return "ep-" + str(self.endpoint_counter)
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(
@@ -1073,7 +1081,7 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
"""
yield _shared_simple_env
_shared_simple_env.postgres.stop_all()
_shared_simple_env.endpoints.stop_all()
@pytest.fixture(scope="function")
@@ -1097,7 +1105,7 @@ def neon_env_builder(
neon_env_builder.init_start().
After the initialization, you can launch compute nodes by calling
the functions in the 'env.postgres' factory object, stop/start the
the functions in the 'env.endpoints' factory object, stop/start the
nodes, etc.
"""
@@ -1438,16 +1446,16 @@ class NeonCli(AbstractNeonCli):
args.extend(["-m", "immediate"])
return self.raw_cli(args)
def pg_create(
def endpoint_create(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"pg",
"endpoint",
"create",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
@@ -1460,22 +1468,22 @@ class NeonCli(AbstractNeonCli):
args.extend(["--lsn", str(lsn)])
if port is not None:
args.extend(["--port", str(port)])
if node_name is not None:
args.append(node_name)
if endpoint_id is not None:
args.append(endpoint_id)
res = self.raw_cli(args)
res.check_returncode()
return res
def pg_start(
def endpoint_start(
self,
node_name: str,
endpoint_id: str,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"pg",
"endpoint",
"start",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
@@ -1486,30 +1494,30 @@ class NeonCli(AbstractNeonCli):
args.append(f"--lsn={lsn}")
if port is not None:
args.append(f"--port={port}")
if node_name is not None:
args.append(node_name)
if endpoint_id is not None:
args.append(endpoint_id)
res = self.raw_cli(args)
res.check_returncode()
return res
def pg_stop(
def endpoint_stop(
self,
node_name: str,
endpoint_id: str,
tenant_id: Optional[TenantId] = None,
destroy=False,
check_return_code=True,
) -> "subprocess.CompletedProcess[str]":
args = [
"pg",
"endpoint",
"stop",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
]
if destroy:
args.append("--destroy")
if node_name is not None:
args.append(node_name)
if endpoint_id is not None:
args.append(endpoint_id)
return self.raw_cli(args, check_return_code=check_return_code)
@@ -1905,15 +1913,26 @@ def remote_pg(
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
host = parse_dsn(connstr).get("host", "")
is_neon = host.endswith(".neon.build")
start_ms = int(datetime.utcnow().timestamp() * 1000)
with RemotePostgres(pg_bin, connstr) as remote_pg:
if is_neon:
timeline_id = TimelineId(remote_pg.safe_psql("SHOW neon.timeline_id")[0][0])
yield remote_pg
end_ms = int(datetime.utcnow().timestamp() * 1000)
host = parse_dsn(connstr).get("host", "")
if host.endswith(".neon.build"):
if is_neon:
# Add 10s margin to the start and end times
allure_add_grafana_links(host, start_ms - 10_000, end_ms + 10_000)
allure_add_grafana_links(
host,
timeline_id,
start_ms - 10_000,
end_ms + 10_000,
)
class PSQL:
@@ -2033,6 +2052,17 @@ class NeonProxy(PgProtocol):
self._wait_until_ready()
return self
# Sends SIGTERM to the proxy if it has been started
def terminate(self):
if self._popen:
self._popen.terminate()
# Waits for proxy to exit if it has been opened with a default timeout of
# two seconds. Raises subprocess.TimeoutExpired if the proxy does not exit in time.
def wait_for_exit(self, timeout=2):
if self._popen:
self._popen.wait(timeout=2)
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
def _wait_until_ready(self):
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
@@ -2167,8 +2197,8 @@ def static_proxy(
yield proxy
class Postgres(PgProtocol):
"""An object representing a running postgres daemon."""
class Endpoint(PgProtocol):
"""An object representing a Postgres compute endpoint managed by the control plane."""
def __init__(
self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True
@@ -2176,33 +2206,40 @@ class Postgres(PgProtocol):
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.node_name: Optional[str] = None # dubious, see asserts below
self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
self.port = port
self.check_stop_result = check_stop_result
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
def create(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Postgres":
) -> "Endpoint":
"""
Create the pg data directory.
Create a new Postgres endpoint.
Returns self.
"""
if not config_lines:
config_lines = []
self.node_name = node_name or f"{branch_name}_pg_node"
self.env.neon_cli.pg_create(
branch_name, node_name=self.node_name, tenant_id=self.tenant_id, lsn=lsn, port=self.port
if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id()
self.endpoint_id = endpoint_id
self.env.neon_cli.endpoint_create(
branch_name,
endpoint_id=self.endpoint_id,
tenant_id=self.tenant_id,
lsn=lsn,
port=self.port,
)
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
if config_lines is None:
@@ -2215,26 +2252,30 @@ class Postgres(PgProtocol):
return self
def start(self) -> "Postgres":
def start(self) -> "Endpoint":
"""
Start the Postgres instance.
Returns self.
"""
assert self.node_name is not None
assert self.endpoint_id is not None
log.info(f"Starting postgres node {self.node_name}")
log.info(f"Starting postgres endpoint {self.endpoint_id}")
self.env.neon_cli.pg_start(self.node_name, tenant_id=self.tenant_id, port=self.port)
self.env.neon_cli.endpoint_start(self.endpoint_id, tenant_id=self.tenant_id, port=self.port)
self.running = True
return self
def endpoint_path(self) -> Path:
"""Path to endpoint directory"""
assert self.endpoint_id
path = Path("endpoints") / self.endpoint_id
return self.env.repo_dir / path
def pg_data_dir_path(self) -> str:
"""Path to data directory"""
assert self.node_name
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
return os.path.join(self.env.repo_dir, path)
"""Path to Postgres data directory"""
return os.path.join(self.endpoint_path(), "pgdata")
def pg_xact_dir_path(self) -> str:
"""Path to pg_xact dir"""
@@ -2248,7 +2289,7 @@ class Postgres(PgProtocol):
"""Path to postgresql.conf"""
return os.path.join(self.pg_data_dir_path(), "postgresql.conf")
def adjust_for_safekeepers(self, safekeepers: str) -> "Postgres":
def adjust_for_safekeepers(self, safekeepers: str) -> "Endpoint":
"""
Adjust instance config for working with wal acceptors instead of
pageserver (pre-configured by CLI) directly.
@@ -2272,7 +2313,7 @@ class Postgres(PgProtocol):
f.write("neon.safekeepers = '{}'\n".format(safekeepers))
return self
def config(self, lines: List[str]) -> "Postgres":
def config(self, lines: List[str]) -> "Endpoint":
"""
Add lines to postgresql.conf.
Lines should be an array of valid postgresql.conf rows.
@@ -2286,32 +2327,32 @@ class Postgres(PgProtocol):
return self
def stop(self) -> "Postgres":
def stop(self) -> "Endpoint":
"""
Stop the Postgres instance if it's running.
Returns self.
"""
if self.running:
assert self.node_name is not None
self.env.neon_cli.pg_stop(
self.node_name, self.tenant_id, check_return_code=self.check_stop_result
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, self.tenant_id, check_return_code=self.check_stop_result
)
self.running = False
return self
def stop_and_destroy(self) -> "Postgres":
def stop_and_destroy(self) -> "Endpoint":
"""
Stop the Postgres instance, then destroy it.
Stop the Postgres instance, then destroy the endpoint.
Returns self.
"""
assert self.node_name is not None
self.env.neon_cli.pg_stop(
self.node_name, self.tenant_id, True, check_return_code=self.check_stop_result
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, self.tenant_id, True, check_return_code=self.check_stop_result
)
self.node_name = None
self.endpoint_id = None
self.running = False
return self
@@ -2319,13 +2360,12 @@ class Postgres(PgProtocol):
def create_start(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Postgres":
) -> "Endpoint":
"""
Create a Postgres instance, apply config
and then start it.
Create an endpoint, apply config, and start Postgres.
Returns self.
"""
@@ -2333,7 +2373,7 @@ class Postgres(PgProtocol):
self.create(
branch_name=branch_name,
node_name=node_name,
endpoint_id=endpoint_id,
config_lines=config_lines,
lsn=lsn,
).start()
@@ -2342,7 +2382,7 @@ class Postgres(PgProtocol):
return self
def __enter__(self) -> "Postgres":
def __enter__(self) -> "Endpoint":
return self
def __exit__(
@@ -2354,33 +2394,33 @@ class Postgres(PgProtocol):
self.stop()
class PostgresFactory:
"""An object representing multiple running postgres daemons."""
class EndpointFactory:
"""An object representing multiple compute endpoints."""
def __init__(self, env: NeonEnv):
self.env = env
self.num_instances: int = 0
self.instances: List[Postgres] = []
self.endpoints: List[Endpoint] = []
def create_start(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> Postgres:
pg = Postgres(
) -> Endpoint:
ep = Endpoint(
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
self.instances.append(pg)
self.endpoints.append(ep)
return pg.create_start(
return ep.create_start(
branch_name=branch_name,
node_name=node_name,
endpoint_id=endpoint_id,
config_lines=config_lines,
lsn=lsn,
)
@@ -2388,30 +2428,33 @@ class PostgresFactory:
def create(
self,
branch_name: str,
node_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> Postgres:
pg = Postgres(
) -> Endpoint:
ep = Endpoint(
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
self.instances.append(pg)
if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id()
return pg.create(
self.num_instances += 1
self.endpoints.append(ep)
return ep.create(
branch_name=branch_name,
node_name=node_name,
endpoint_id=endpoint_id,
lsn=lsn,
config_lines=config_lines,
)
def stop_all(self) -> "PostgresFactory":
for pg in self.instances:
pg.stop()
def stop_all(self) -> "EndpointFactory":
for ep in self.endpoints:
ep.stop()
return self
@@ -2786,16 +2829,16 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
def check_restored_datadir_content(
test_output_dir: Path,
env: NeonEnv,
pg: Postgres,
endpoint: Endpoint,
):
# Get the timeline ID. We need it for the 'basebackup' command
timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0])
timeline = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
# stop postgres to ensure that files won't change
pg.stop()
endpoint.stop()
# Take a basebackup from pageserver
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
@@ -2805,7 +2848,7 @@ def check_restored_datadir_content(
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.pageserver.service_port.pg} \
-c 'basebackup {pg.tenant_id} {timeline}' \
-c 'basebackup {endpoint.tenant_id} {timeline}' \
| tar -x -C {restored_dir_path}
"""
@@ -2822,8 +2865,8 @@ def check_restored_datadir_content(
assert result.returncode == 0
# list files we're going to compare
assert pg.pgdata_dir
pgdata_files = list_files_to_compare(Path(pg.pgdata_dir))
assert endpoint.pgdata_dir
pgdata_files = list_files_to_compare(Path(endpoint.pgdata_dir))
restored_files = list_files_to_compare(restored_dir_path)
# check that file sets are equal
@@ -2834,12 +2877,12 @@ def check_restored_datadir_content(
# We've already filtered all mismatching files in list_files_to_compare(),
# so here expect that the content is identical
(match, mismatch, error) = filecmp.cmpfiles(
pg.pgdata_dir, restored_dir_path, pgdata_files, shallow=False
endpoint.pgdata_dir, restored_dir_path, pgdata_files, shallow=False
)
log.info(f"filecmp result mismatch and error lists:\n\t mismatch={mismatch}\n\t error={error}")
for f in mismatch:
f1 = os.path.join(pg.pgdata_dir, f)
f1 = os.path.join(endpoint.pgdata_dir, f)
f2 = os.path.join(restored_dir_path, f)
stdout_filename = "{}.filediff".format(f2)
@@ -2854,24 +2897,24 @@ def check_restored_datadir_content(
def wait_for_last_flush_lsn(
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def wait_for_wal_insert_lsn(
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def fork_at_current_lsn(
env: NeonEnv,
pg: Postgres,
endpoint: Endpoint,
new_branch_name: str,
ancestor_branch_name: str,
tenant_id: Optional[TenantId] = None,
@@ -2881,7 +2924,7 @@ def fork_at_current_lsn(
The "last LSN" is taken from the given Postgres instance. The pageserver will wait for all the
the WAL up to that LSN to arrive in the pageserver before creating the branch.
"""
current_lsn = pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
current_lsn = endpoint.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)

View File

@@ -519,6 +519,13 @@ class PageserverHttpClient(requests.Session):
assert res.status_code == 200
def download_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)
for layer in info.historic_layers:
if not layer.remote:
continue
self.download_layer(tenant_id, timeline_id, layer.layer_file_name)
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",

View File

@@ -1,16 +1,20 @@
import time
from typing import Optional
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import Lsn, TenantId, TimelineId
def assert_tenant_status(
pageserver_http: PageserverHttpClient, tenant: TenantId, expected_status: str
def assert_tenant_state(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
expected_state: str,
message: Optional[str] = None,
):
tenant_status = pageserver_http.tenant_status(tenant)
log.info(f"tenant_status: {tenant_status}")
assert tenant_status["state"] == expected_status, tenant_status
assert tenant_status["state"]["slug"] == expected_state, message or tenant_status
def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
@@ -68,6 +72,7 @@ def wait_until_tenant_state(
tenant_id: TenantId,
expected_state: str,
iterations: int,
period: float = 1.0,
) -> bool:
"""
Does not use `wait_until` for debugging purposes
@@ -76,21 +81,28 @@ def wait_until_tenant_state(
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"] == expected_state:
if tenant["state"]["slug"] == expected_state:
return True
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
time.sleep(1)
time.sleep(period)
raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")
def wait_until_tenant_active(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, iterations: int = 30
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
iterations: int = 30,
period: float = 1.0,
):
wait_until_tenant_state(
pageserver_http, tenant_id, expected_state="Active", iterations=iterations
pageserver_http,
tenant_id,
expected_state="Active",
iterations=iterations,
period=period,
)

View File

@@ -13,6 +13,7 @@ import allure
from psycopg2.extensions import cursor
from fixtures.log_helper import log
from fixtures.types import TimelineId
Fn = TypeVar("Fn", bound=Callable[..., Any])
@@ -186,11 +187,15 @@ def allure_attach_from_dir(dir: Path):
allure.attach.file(source, name, attachment_type, extension)
DATASOURCE_ID = "xHHYY0dVz"
GRAFANA_URL = "https://neonprod.grafana.net"
GRAFANA_EXPLORE_URL = f"{GRAFANA_URL}/explore"
GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL = f"{GRAFANA_URL}/d/8G011dlnk/timeline-inspector"
LOGS_STAGING_DATASOURCE_ID = "xHHYY0dVz"
def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int, end_ms: int):
"""Add links to server logs in Grafana to Allure report"""
links = {}
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
endpoint_id, region_id, _ = host.split(".", 2)
@@ -202,12 +207,12 @@ def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
}
params: Dict[str, Any] = {
"datasource": DATASOURCE_ID,
"datasource": LOGS_STAGING_DATASOURCE_ID,
"queries": [
{
"expr": "<PUT AN EXPRESSION HERE>",
"refId": "A",
"datasource": {"type": "loki", "uid": DATASOURCE_ID},
"datasource": {"type": "loki", "uid": LOGS_STAGING_DATASOURCE_ID},
"editorMode": "code",
"queryType": "range",
}
@@ -220,8 +225,23 @@ def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
for name, expr in expressions.items():
params["queries"][0]["expr"] = expr
query_string = urlencode({"orgId": 1, "left": json.dumps(params)})
link = f"https://neonprod.grafana.net/explore?{query_string}"
links[name] = f"{GRAFANA_EXPLORE_URL}?{query_string}"
timeline_qs = urlencode(
{
"orgId": 1,
"var-environment": "victoria-metrics-aws-dev",
"var-timeline_id": timeline_id,
"var-endpoint_id": endpoint_id,
"var-log_datasource": "grafanacloud-neonstaging-logs",
"from": start_ms,
"to": end_ms,
}
)
link = f"{GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL}?{timeline_qs}"
links["Timeline Inspector"] = link
for name, link in links.items():
allure.dynamic.link(link, name=name)
log.info(f"{name}: {link}")

View File

@@ -52,13 +52,13 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
def run_pgbench(branch: str):
log.info(f"Start a pgbench workload on branch {branch}")
pg = env.postgres.create_start(branch, tenant_id=tenant)
connstr = pg.connstr()
endpoint = env.endpoints.create_start(branch, tenant_id=tenant)
connstr = endpoint.connstr()
pg_bin.run_capture(["pgbench", "-i", connstr])
pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr])
pg.stop()
endpoint.stop()
env.neon_cli.create_branch("b0", tenant_id=tenant)
@@ -96,8 +96,8 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int):
env.neon_cli.create_branch("b0")
pg = env.postgres.create_start("b0")
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", pg.connstr()])
endpoint = env.endpoints.create_start("b0")
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()])
branch_creation_durations = []
@@ -124,15 +124,15 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
timeline_id = env.neon_cli.create_branch("root")
pg = env.postgres.create_start("root")
with closing(pg.connect()) as conn:
endpoint = env.endpoints.create_start("root")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(10000):
cur.execute(f"CREATE TABLE t{i} as SELECT g FROM generate_series(1, 1000) g")
# Wait for the pageserver to finish processing all the pending WALs,
# as we don't want the LSN wait time to be included during the branch creation
flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(
env.pageserver.http_client(), env.initial_tenant, timeline_id, flush_lsn
)
@@ -142,7 +142,7 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
# run a concurrent insertion to make the ancestor "busy" during the branch creation
thread = threading.Thread(
target=pg.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
target=endpoint.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
)
thread.start()

View File

@@ -42,41 +42,41 @@ def test_compare_child_and_root_pgbench_perf(neon_compare: NeonCompare):
neon_compare.zenbenchmark.record_pg_bench_result(branch, res)
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
pg_bin.run_capture(["pgbench", "-i", pg_root.connstr(), "-s10"])
endpoint_root = env.endpoints.create_start("root")
pg_bin.run_capture(["pgbench", "-i", endpoint_root.connstr(), "-s10"])
fork_at_current_lsn(env, pg_root, "child", "root")
fork_at_current_lsn(env, endpoint_root, "child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", pg_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", pg_child.connstr()])
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", endpoint_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", endpoint_child.connstr()])
def test_compare_child_and_root_write_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
endpoint_root = env.endpoints.create_start("root")
pg_root.safe_psql(
endpoint_root.safe_psql(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
)
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
with neon_compare.record_duration("root_run_duration"):
pg_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
endpoint_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
with neon_compare.record_duration("child_run_duration"):
pg_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
endpoint_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
endpoint_root = env.endpoints.create_start("root")
pg_root.safe_psql_many(
endpoint_root.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
@@ -84,12 +84,12 @@ def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
)
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
with neon_compare.record_duration("root_run_duration"):
pg_root.safe_psql("SELECT count(*) from foo")
endpoint_root.safe_psql("SELECT count(*) from foo")
with neon_compare.record_duration("child_run_duration"):
pg_child.safe_psql("SELECT count(*) from foo")
endpoint_child.safe_psql("SELECT count(*) from foo")
# -----------------------------------------------------------------------

View File

@@ -35,14 +35,14 @@ def test_bulk_tenant_create(
# if use_safekeepers == 'with_sa':
# wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start(
endpoint_tenant = env.endpoints.create_start(
f"test_bulk_tenant_create_{tenants_count}_{i}", tenant_id=tenant
)
end = timeit.default_timer()
time_slices.append(end - start)
pg_tenant.stop()
endpoint_tenant.stop()
zenbenchmark.record(
"tenant_creation_time",

View File

@@ -18,8 +18,8 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
timeline_id = env.neon_cli.create_branch("test_bulk_update")
tenant_id = env.initial_tenant
pg = env.postgres.create_start("test_bulk_update")
cur = pg.connect().cursor()
endpoint = env.endpoints.create_start("test_bulk_update")
cur = endpoint.connect().cursor()
cur.execute("set statement_timeout=0")
cur.execute(f"create table t(x integer) WITH (fillfactor={fillfactor})")
@@ -28,13 +28,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-no-prefetch"):
cur.execute("update t set x=x+1")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-no-prefetch"):
cur.execute("delete from t")
@@ -50,13 +50,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
cur.execute(f"insert into t2 values (generate_series(1,{n_records}))")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-with-prefetch"):
cur.execute("update t2 set x=x+1")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-with-prefetch"):
cur.execute("delete from t2")

View File

@@ -33,11 +33,11 @@ def test_compaction(neon_compare: NeonCompare):
# Create some tables, and run a bunch of INSERTs and UPDATes on them,
# to generate WAL and layers
pg = env.postgres.create_start(
endpoint = env.endpoints.create_start(
"main", tenant_id=tenant_id, config_lines=["shared_buffers=512MB"]
)
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(100):
cur.execute(f"create table tbl{i} (i int, j int);")
@@ -45,7 +45,7 @@ def test_compaction(neon_compare: NeonCompare):
for j in range(100):
cur.execute(f"update tbl{i} set j = {j};")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# First compaction generates L1 layers
with neon_compare.zenbenchmark.record_duration("compaction"):

View File

@@ -2,13 +2,13 @@ import threading
import pytest
from fixtures.compare_fixtures import PgCompare
from fixtures.neon_fixtures import Postgres
from fixtures.neon_fixtures import PgProtocol
from performance.test_perf_pgbench import get_scales_matrix
from performance.test_wal_backpressure import record_read_latency
def start_write_workload(pg: Postgres, scale: int = 10):
def start_write_workload(pg: PgProtocol, scale: int = 10):
with pg.connect().cursor() as cur:
cur.execute(f"create table big as select generate_series(1,{scale*100_000})")

View File

@@ -25,8 +25,8 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
)
env.neon_cli.create_timeline("test_layer_map", tenant_id=tenant)
pg = env.postgres.create_start("test_layer_map", tenant_id=tenant)
cur = pg.connect().cursor()
endpoint = env.endpoints.create_start("test_layer_map", tenant_id=tenant)
cur = endpoint.connect().cursor()
cur.execute("create table t(x integer)")
for i in range(n_iters):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")

View File

@@ -14,19 +14,19 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
# Start
env.neon_cli.create_branch("test_startup")
with zenbenchmark.record_duration("startup_time"):
pg = env.postgres.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint = env.endpoints.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Restart
pg.stop_and_destroy()
endpoint.stop_and_destroy()
with zenbenchmark.record_duration("restart_time"):
pg.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Fill up
num_rows = 1000000 # 30 MB
num_tables = 100
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(num_tables):
cur.execute(f"create table t_{i} (i integer);")
@@ -34,18 +34,18 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
# Read
with zenbenchmark.record_duration("read_time"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")
# Read again
with zenbenchmark.record_duration("second_read_time"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")
# Restart
pg.stop_and_destroy()
endpoint.stop_and_destroy()
with zenbenchmark.record_duration("restart_with_data"):
pg.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Read
with zenbenchmark.record_duration("read_after_restart"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")

View File

@@ -22,8 +22,8 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
pg_branch0 = env.postgres.create_start("main", tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
branch0_cur = endpoint_branch0.connect().cursor()
branch0_timeline = TimelineId(query_scalar(branch0_cur, "SHOW neon.timeline_id"))
log.info(f"b0 timeline {branch0_timeline}")
@@ -44,10 +44,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
# Create branch1.
env.neon_cli.create_branch("branch1", "main", tenant_id=tenant, ancestor_start_lsn=lsn_100)
pg_branch1 = env.postgres.create_start("branch1", tenant_id=tenant)
endpoint_branch1 = env.endpoints.create_start("branch1", tenant_id=tenant)
log.info("postgres is running on 'branch1' branch")
branch1_cur = pg_branch1.connect().cursor()
branch1_cur = endpoint_branch1.connect().cursor()
branch1_timeline = TimelineId(query_scalar(branch1_cur, "SHOW neon.timeline_id"))
log.info(f"b1 timeline {branch1_timeline}")
@@ -67,9 +67,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
# Create branch2.
env.neon_cli.create_branch("branch2", "branch1", tenant_id=tenant, ancestor_start_lsn=lsn_200)
pg_branch2 = env.postgres.create_start("branch2", tenant_id=tenant)
endpoint_branch2 = env.endpoints.create_start("branch2", tenant_id=tenant)
log.info("postgres is running on 'branch2' branch")
branch2_cur = pg_branch2.connect().cursor()
branch2_cur = endpoint_branch2.connect().cursor()
branch2_timeline = TimelineId(query_scalar(branch2_cur, "SHOW neon.timeline_id"))
log.info(f"b2 timeline {branch2_timeline}")

View File

@@ -64,9 +64,9 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
branch = "test_compute_auth_to_pageserver"
env.neon_cli.create_branch(branch)
pg = env.postgres.create_start(branch)
endpoint = env.endpoints.create_start(branch)
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -83,7 +83,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
branch = f"test_auth_failures_auth_enabled_{auth_enabled}"
timeline_id = env.neon_cli.create_branch(branch)
env.postgres.create_start(branch)
env.endpoints.create_start(branch)
tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant)
invalid_tenant_token = env.auth_keys.generate_tenant_token(TenantId.generate())

View File

@@ -5,7 +5,7 @@ from contextlib import closing, contextmanager
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder
pytest_plugins = "fixtures.neon_fixtures"
@@ -20,10 +20,10 @@ def pg_cur(pg):
# Periodically check that all backpressure lags are below the configured threshold,
# assert if they are not.
# If the check query fails, stop the thread. Main thread should notice that and stop the test.
def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interval=5):
def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_interval=5):
log.info("checks started")
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
@@ -41,7 +41,7 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv
max_replication_apply_lag_bytes = res[0]
log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes")
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
while not stop_event.is_set():
try:
cur.execute(
@@ -102,14 +102,14 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# Create a branch for us
env.neon_cli.create_branch("test_backpressure")
pg = env.postgres.create_start(
endpoint = env.endpoints.create_start(
"test_backpressure", config_lines=["max_replication_write_lag=30MB"]
)
log.info("postgres is running on 'test_backpressure' branch")
# setup check thread
check_stop_event = threading.Event()
check_thread = threading.Thread(target=check_backpressure, args=(pg, check_stop_event))
check_thread = threading.Thread(target=check_backpressure, args=(endpoint, check_stop_event))
check_thread.start()
# Configure failpoint to slow down walreceiver ingest
@@ -125,7 +125,7 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# because of the lag and waiting for lsn to replay to arrive.
time.sleep(2)
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")

View File

@@ -15,4 +15,4 @@ def test_basebackup_error(neon_simple_env: NeonEnv):
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
with pytest.raises(Exception, match="basebackup-before-control-file"):
env.postgres.create_start("test_basebackup_error")
env.endpoints.create_start("test_basebackup_error")

View File

@@ -67,9 +67,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
)
timeline_main = env.neon_cli.create_timeline("test_main", tenant_id=tenant)
pg_main = env.postgres.create_start("test_main", tenant_id=tenant)
endpoint_main = env.endpoints.create_start("test_main", tenant_id=tenant)
main_cur = pg_main.connect().cursor()
main_cur = endpoint_main.connect().cursor()
main_cur.execute(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
@@ -90,9 +90,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
env.neon_cli.create_branch(
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
)
pg_branch = env.postgres.create_start("test_branch", tenant_id=tenant)
endpoint_branch = env.endpoints.create_start("test_branch", tenant_id=tenant)
branch_cur = pg_branch.connect().cursor()
branch_cur = endpoint_branch.connect().cursor()
branch_cur.execute("INSERT INTO foo SELECT FROM generate_series(1, 100000)")
assert query_scalar(branch_cur, "SELECT count(*) FROM foo") == 200000
@@ -142,8 +142,8 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
)
b0 = env.neon_cli.create_branch("b0", tenant_id=tenant)
pg0 = env.postgres.create_start("b0", tenant_id=tenant)
res = pg0.safe_psql_many(
endpoint0 = env.endpoints.create_start("b0", tenant_id=tenant)
res = endpoint0.safe_psql_many(
queries=[
"CREATE TABLE t(key serial primary key)",
"INSERT INTO t SELECT FROM generate_series(1, 100000)",

View File

@@ -18,10 +18,10 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# Branch at the point where only 100 rows were inserted
env.neon_cli.create_branch("test_branch_behind")
pgmain = env.postgres.create_start("test_branch_behind")
endpoint_main = env.endpoints.create_start("test_branch_behind")
log.info("postgres is running on 'test_branch_behind' branch")
main_cur = pgmain.connect().cursor()
main_cur = endpoint_main.connect().cursor()
timeline = TimelineId(query_scalar(main_cur, "SHOW neon.timeline_id"))
@@ -74,15 +74,15 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
"test_branch_behind_more", "test_branch_behind", ancestor_start_lsn=lsn_b
)
pg_hundred = env.postgres.create_start("test_branch_behind_hundred")
pg_more = env.postgres.create_start("test_branch_behind_more")
endpoint_hundred = env.endpoints.create_start("test_branch_behind_hundred")
endpoint_more = env.endpoints.create_start("test_branch_behind_more")
# On the 'hundred' branch, we should see only 100 rows
hundred_cur = pg_hundred.connect().cursor()
hundred_cur = endpoint_hundred.connect().cursor()
assert query_scalar(hundred_cur, "SELECT count(*) FROM foo") == 100
# On the 'more' branch, we should see 100200 rows
more_cur = pg_more.connect().cursor()
more_cur = endpoint_more.connect().cursor()
assert query_scalar(more_cur, "SELECT count(*) FROM foo") == 200100
# All the rows are visible on the main branch
@@ -94,8 +94,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
env.neon_cli.create_branch(
"test_branch_segment_boundary", "test_branch_behind", ancestor_start_lsn=Lsn("0/3000000")
)
pg = env.postgres.create_start("test_branch_segment_boundary")
assert pg.safe_psql("SELECT 1")[0][0] == 1
endpoint = env.endpoints.create_start("test_branch_segment_boundary")
assert endpoint.safe_psql("SELECT 1")[0][0] == 1
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn: .*"):

View File

@@ -5,7 +5,7 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
@@ -40,20 +40,20 @@ def test_branching_with_pgbench(
}
)
def run_pgbench(pg: Postgres):
connstr = pg.connstr()
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-T15", connstr])
env.neon_cli.create_branch("b0", tenant_id=tenant)
pgs: List[Postgres] = []
pgs.append(env.postgres.create_start("b0", tenant_id=tenant))
endpoints: List[Endpoint] = []
endpoints.append(env.endpoints.create_start("b0", tenant_id=tenant))
threads: List[threading.Thread] = []
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0],), daemon=True))
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[0].connstr(),), daemon=True)
)
threads[-1].start()
thread_limit = 4
@@ -79,16 +79,18 @@ def test_branching_with_pgbench(
else:
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
pgs.append(env.postgres.create_start("b{}".format(i + 1), tenant_id=tenant))
endpoints.append(env.endpoints.create_start("b{}".format(i + 1), tenant_id=tenant))
threads.append(threading.Thread(target=run_pgbench, args=(pgs[-1],), daemon=True))
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[-1].connstr(),), daemon=True)
)
threads[-1].start()
for thread in threads:
thread.join()
for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
for ep in endpoints:
res = ep.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,)
@@ -110,11 +112,11 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
env = neon_simple_env
env.neon_cli.create_branch("b0")
pg0 = env.postgres.create_start("b0")
endpoint0 = env.endpoints.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
with pg0.cursor() as cur:
with endpoint0.cursor() as cur:
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
@@ -123,6 +125,6 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
pg1 = env.postgres.create_start("b1")
endpoint1 = env.endpoints.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])

View File

@@ -4,7 +4,7 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
from fixtures.types import TenantId, TimelineId
@@ -24,17 +24,17 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
]
)
tenant_timelines: List[Tuple[TenantId, TimelineId, Postgres]] = []
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
for n in range(4):
tenant_id, timeline_id = env.neon_cli.create_tenant()
pg = env.postgres.create_start("main", tenant_id=tenant_id)
with pg.cursor() as cur:
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
pg.stop()
tenant_timelines.append((tenant_id, timeline_id, pg))
endpoint.stop()
tenant_timelines.append((tenant_id, timeline_id, endpoint))
# Stop the pageserver
env.pageserver.stop()

View File

@@ -24,14 +24,14 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
"autovacuum_freeze_max_age=100000",
]
pg = env.postgres.create_start("test_clog_truncate", config_lines=config)
endpoint = env.endpoints.create_start("test_clog_truncate", config_lines=config)
log.info("postgres is running on test_clog_truncate branch")
# Install extension containing function needed for test
pg.safe_psql("CREATE EXTENSION neon_test_utils")
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
# Consume many xids to advance clog
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("select test_consume_xids(1000*1000*10);")
log.info("xids consumed")
@@ -44,7 +44,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
# wait for autovacuum to truncate the pg_xact
# XXX Is it worth to add a timeout here?
pg_xact_0000_path = os.path.join(pg.pg_xact_dir_path(), "0000")
pg_xact_0000_path = os.path.join(endpoint.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path = {pg_xact_0000_path}")
while os.path.isfile(pg_xact_0000_path):
@@ -52,7 +52,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
time.sleep(5)
# checkpoint to advance latest lsn
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("CHECKPOINT;")
lsn_after_truncation = query_scalar(cur, "select pg_current_wal_insert_lsn()")
@@ -61,10 +61,10 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
env.neon_cli.create_branch(
"test_clog_truncate_new", "test_clog_truncate", ancestor_start_lsn=lsn_after_truncation
)
pg2 = env.postgres.create_start("test_clog_truncate_new")
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
log.info("postgres is running on test_clog_truncate_new branch")
# check that new node doesn't contain truncated segment
pg_xact_0000_path_new = os.path.join(pg2.pg_xact_dir_path(), "0000")
pg_xact_0000_path_new = os.path.join(endpoint2.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path_new = {pg_xact_0000_path_new}")
assert os.path.isfile(pg_xact_0000_path_new) is False

View File

@@ -24,8 +24,8 @@ def test_lsof_pageserver_pid(neon_simple_env: NeonEnv):
def start_workload():
env.neon_cli.create_branch("test_lsof_pageserver_pid")
pg = env.postgres.create_start("test_lsof_pageserver_pid")
with closing(pg.connect()) as conn:
endpoint = env.endpoints.create_start("test_lsof_pageserver_pid")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE foo as SELECT x FROM generate_series(1,100000) x")
cur.execute("update foo set x=x+1")

View File

@@ -1,3 +1,4 @@
import copy
import os
import shutil
import subprocess
@@ -55,29 +56,31 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start()
pg = env.postgres.create_start("main")
endpoint = env.endpoints.create_start("main")
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
)
pg_bin.run(["pgbench", "--initialize", "--scale=10", pg.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", pg.connstr()])
pg_bin.run(["pg_dumpall", f"--dbname={pg.connstr()}", f"--file={test_output_dir / 'dump.sql'}"])
pg_bin.run(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
pg_bin.run(
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
env.postgres.stop_all()
env.endpoints.stop_all()
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
@@ -98,6 +101,9 @@ def test_backward_compatibility(
pg_version: str,
request: FixtureRequest,
):
"""
Test that the new binaries can read old data
"""
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
@@ -120,6 +126,7 @@ def test_backward_compatibility(
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
neon_binpath,
neon_binpath,
pg_distrib_dir,
pg_version,
port_distributor,
@@ -148,7 +155,11 @@ def test_forward_compatibility(
port_distributor: PortDistributor,
pg_version: str,
request: FixtureRequest,
neon_binpath: Path,
):
"""
Test that the old binaries can read new data
"""
compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN")
assert compatibility_neon_bin_env is not None, (
"COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries "
@@ -183,6 +194,7 @@ def test_forward_compatibility(
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
compatibility_neon_bin,
neon_binpath,
compatibility_postgres_distrib_dir,
pg_version,
port_distributor,
@@ -223,9 +235,13 @@ def prepare_snapshot(
for logfile in repo_dir.glob("**/*.log"):
logfile.unlink()
# Remove tenants data for compute
for tenant in (repo_dir / "pgdatadirs" / "tenants").glob("*"):
shutil.rmtree(tenant)
# Remove old computes in 'endpoints'. Old versions of the control plane used a directory
# called "pgdatadirs". Delete it, too.
if (repo_dir / "endpoints").exists():
shutil.rmtree(repo_dir / "endpoints")
if (repo_dir / "pgdatadirs").exists():
shutil.rmtree(repo_dir / "pgdatadirs")
os.mkdir(repo_dir / "endpoints")
# Remove wal-redo temp directory if it exists. Newer pageserver versions don't create
# them anymore, but old versions did.
@@ -326,7 +342,8 @@ def get_neon_version(neon_binpath: Path):
def check_neon_works(
repo_dir: Path,
neon_binpath: Path,
neon_target_binpath: Path,
neon_current_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
port_distributor: PortDistributor,
@@ -336,7 +353,7 @@ def check_neon_works(
):
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["neon_distrib_dir"] = str(neon_binpath)
snapshot_config["neon_distrib_dir"] = str(neon_target_binpath)
snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir)
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
@@ -347,17 +364,25 @@ def check_neon_works(
config.repo_dir = repo_dir
config.pg_version = pg_version
config.initial_tenant = snapshot_config["default_tenant_id"]
config.neon_binpath = neon_binpath
config.pg_distrib_dir = pg_distrib_dir
config.preserve_database_files = True
cli = NeonCli(config)
cli.raw_cli(["start"])
request.addfinalizer(lambda: cli.raw_cli(["stop"]))
# Use the "target" binaries to launch the storage nodes
config_target = config
config_target.neon_binpath = neon_target_binpath
cli_target = NeonCli(config_target)
# And the current binaries to launch computes
config_current = copy.copy(config)
config_current.neon_binpath = neon_current_binpath
cli_current = NeonCli(config_current)
cli_target.raw_cli(["start"])
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
pg_port = port_distributor.get_port()
cli.pg_start("main", port=pg_port)
request.addfinalizer(lambda: cli.pg_stop("main"))
cli_current.endpoint_start("main", port=pg_port)
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
pg_bin.run(["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"])

View File

@@ -13,10 +13,10 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
pg = env.postgres.create_start("test_compute_ctl")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
endpoint = env.endpoints.create_start("test_compute_ctl")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(pg.config_file_path(), "r") as f:
with open(endpoint.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
@@ -24,10 +24,13 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = pg.pg_data_dir_path()
pgdata = endpoint.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
pg.stop_and_destroy()
endpoint.stop_and_destroy()
# stop_and_destroy removes the whole endpoint directory. Recreate it.
Path(pgdata).mkdir(parents=True)
spec = (
"""

View File

@@ -12,10 +12,10 @@ def test_config(neon_simple_env: NeonEnv):
env.neon_cli.create_branch("test_config", "empty")
# change config
pg = env.postgres.create_start("test_config", config_lines=["log_min_messages=debug1"])
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
log.info("postgres is running on test_config branch")
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""

View File

@@ -21,11 +21,11 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_crafted_wal_end")
pg = env.postgres.create("test_crafted_wal_end")
endpoint = env.endpoints.create("test_crafted_wal_end")
wal_craft = WalCraft(env)
pg.config(wal_craft.postgres_config())
pg.start()
res = pg.safe_psql_many(
endpoint.config(wal_craft.postgres_config())
endpoint.start()
res = endpoint.safe_psql_many(
queries=[
"CREATE TABLE keys(key int primary key)",
"INSERT INTO keys SELECT generate_series(1, 100)",
@@ -34,7 +34,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
)
assert res[-1][0] == (5050,)
wal_craft.in_existing(wal_type, pg.connstr())
wal_craft.in_existing(wal_type, endpoint.connstr())
log.info("Restarting all safekeepers and pageservers")
env.pageserver.stop()
@@ -43,7 +43,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env.pageserver.start()
log.info("Trying more queries")
res = pg.safe_psql_many(
res = endpoint.safe_psql_many(
queries=[
"SELECT SUM(key) FROM keys",
"INSERT INTO keys SELECT generate_series(101, 200)",
@@ -60,7 +60,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env.pageserver.start()
log.info("Trying more queries (again)")
res = pg.safe_psql_many(
res = endpoint.safe_psql_many(
queries=[
"SELECT SUM(key) FROM keys",
"INSERT INTO keys SELECT generate_series(201, 300)",

View File

@@ -13,10 +13,10 @@ def test_createdb(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_createdb", "empty")
pg = env.postgres.create_start("test_createdb")
endpoint = env.endpoints.create_start("test_createdb")
log.info("postgres is running on 'test_createdb' branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
@@ -26,10 +26,10 @@ def test_createdb(neon_simple_env: NeonEnv):
# Create a branch
env.neon_cli.create_branch("test_createdb2", "test_createdb", ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start("test_createdb2")
endpoint2 = env.endpoints.create_start("test_createdb2")
# Test that you can connect to the new database on both branches
for db in (pg, pg2):
for db in (endpoint, endpoint2):
with db.cursor(dbname="foodb") as cur:
# Check database size in both branches
cur.execute(
@@ -55,17 +55,17 @@ def test_createdb(neon_simple_env: NeonEnv):
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
env.neon_cli.create_branch("test_dropdb", "empty")
pg = env.postgres.create_start("test_dropdb")
endpoint = env.endpoints.create_start("test_dropdb")
log.info("postgres is running on 'test_dropdb' branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("CREATE DATABASE foodb")
lsn_before_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
dboid = query_scalar(cur, "SELECT oid FROM pg_database WHERE datname='foodb';")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("DROP DATABASE foodb")
cur.execute("CHECKPOINT")
@@ -76,29 +76,29 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env.neon_cli.create_branch(
"test_before_dropdb", "test_dropdb", ancestor_start_lsn=lsn_before_drop
)
pg_before = env.postgres.create_start("test_before_dropdb")
endpoint_before = env.endpoints.create_start("test_before_dropdb")
env.neon_cli.create_branch(
"test_after_dropdb", "test_dropdb", ancestor_start_lsn=lsn_after_drop
)
pg_after = env.postgres.create_start("test_after_dropdb")
endpoint_after = env.endpoints.create_start("test_after_dropdb")
# Test that database exists on the branch before drop
pg_before.connect(dbname="foodb").close()
endpoint_before.connect(dbname="foodb").close()
# Test that database subdir exists on the branch before drop
assert pg_before.pgdata_dir
dbpath = pathlib.Path(pg_before.pgdata_dir) / "base" / str(dboid)
assert endpoint_before.pgdata_dir
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is True
# Test that database subdir doesn't exist on the branch after drop
assert pg_after.pgdata_dir
dbpath = pathlib.Path(pg_after.pgdata_dir) / "base" / str(dboid)
assert endpoint_after.pgdata_dir
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is False
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, pg)
check_restored_datadir_content(test_output_dir, env, endpoint)

Some files were not shown because too many files have changed in this diff Show More