Compare commits

..

27 Commits

Author SHA1 Message Date
Dmitry Ivanov
2e047b64fd [WIP] Implement proper severity levels in pq_proto's ErrorResponse 2022-12-15 15:38:22 +03:00
Christian Schwarz
397b60feab common abstraction for waiting for SK commit_lsn to reach PS 2022-12-15 11:50:39 +01:00
Christian Schwarz
10cd64cf8d make TaskHandle::next_task_event cancellation-safe
If we get cancelled before jh.await returns we've take()n the join handle but
drop the result on the floor.

Fix it by setting self.join_handle = None after the .await

fixes https://github.com/neondatabase/neon/issues/3104
2022-12-15 10:26:17 +01:00
Christian Schwarz
bf3ac2be2d add remote_physical_size metric
We do the accounting exclusively after updating remote IndexPart successfully.
This is cleaner & more robust than doing it upon completion of
individual layer file uploads / deletions since we can uset .set()
insteaf of add()/sub().

NB: Originally, this work was intended to be part of #3013 but it
turns out that it's completely orthogonal.
So, spin it out into this PR for easier review.
Since this change is additive, it won't break anything.
2022-12-15 09:48:35 +01:00
Sergey Melnikov
c04c201520 Push proxy metrics to Victoria Metrics (#3106) 2022-12-14 21:28:14 +01:00
Christian Schwarz
4132ae9dfe always remove RemoteTimelineClient's metrics when dropping it 2022-12-14 19:25:29 +01:00
Alexander Bayandin
8fcba150db test_seqscans: temporarily disable remote test (#3101)
Temporarily disable `test_seqscans` for remote projects; they acquire
too much space and time. We can try to reenable it back after switching
to per-test projects.
2022-12-14 18:05:05 +00:00
Dmitry Rodionov
df09d0375b ignore metadata_backup files in index_part 2022-12-14 19:00:19 +03:00
Vadim Kharitonov
62f6e969e7 Fix helm value for proxy 2022-12-14 16:41:26 +01:00
Kirill Bulatov
4d201619ed Remove large database files after every test suite (#3090)
Closes https://github.com/neondatabase/neon/issues/1984 
Closes https://github.com/neondatabase/neon/pull/2830

A follow-up of https://github.com/neondatabase/neon/pull/2830, I've
noticed that benchmarks failed again due to out of space issues.

Removes most of the pageserver and safekeeper files from disk after
every pytest suite run.

```
$ poetry run pytest -vvsk "test_tenant_redownloads_truncated_file_on_startup[local_fs]" 
# ...
$ du -h test_output/test_tenant_redownloads_truncated_file_on_startup\[local_fs\]  
# ...
104K    test_output/test_tenant_redownloads_truncated_file_on_startup[local_fs]

$ poetry run pytest -vvsk "test_tenant_redownloads_truncated_file_on_startup[local_fs]" --preserve-database-files
# ...
$ du -h test_output/test_tenant_redownloads_truncated_file_on_startup\[local_fs\]  
# ...
123M    test_output/test_tenant_redownloads_truncated_file_on_startup[local_fs]
```

Co-authored-by: Bojan Serafimov <bojan.serafimov7@gmail.com>
2022-12-14 13:09:08 +00:00
Alexander Bayandin
d3787f9b47 neon-project-create/delete: print project id to stdout (#3073)
Print project_id to GitHub Actions stdout
2022-12-14 13:04:04 +00:00
Shany Pozin
ada5b7158f Fix Issue #3014 (#3059)
* TenantConfigRequest now supports tenant_id as hex string input instead
of bytes array

* Config file is truncated in each creation/update
2022-12-14 14:09:16 +02:00
Arseny Sher
f8ab5ef3b5 Update broker endpoint for prod-us-west-2. (#3095) 2022-12-14 12:58:12 +01:00
Sergey Melnikov
827ee10b5a Disable neon-stress deploy (#3093) 2022-12-14 01:51:42 +01:00
Alexander Bayandin
c819b699be Nightly Benchmark: run neon-captest-reuse from staging (#3086)
The project has been migrated (now it is `restless-king-632302`), and
now we should run tests from staging runners.

Test run:
https://github.com/neondatabase/neon/actions/runs/3686865543/jobs/6241367161

Ref https://github.com/neondatabase/cloud/issues/2836
2022-12-13 23:02:45 +00:00
Sergey Melnikov
228f9e4322 Use default folder for ansible collections (#3092) 2022-12-13 23:59:49 +01:00
Sergey Melnikov
826214ae56 Force ansible-galaxy to also use local ansible.cfg (#3091) 2022-12-13 21:06:18 +01:00
Sergey Melnikov
b39d6126bb Force ansible to use local ansible.cfg (#3089) 2022-12-13 21:57:39 +03:00
Vadim Kharitonov
0bc488b723 Add sentry environment for pageserver and safekeepers in new region
(us-west-2)
2022-12-13 16:26:28 +01:00
Christian Schwarz
0c915dcb1d Timeline::download_missing: fix handling of mismatched layer size
Before this patch, when we decide to rename a layer file to backup
because of layer file size mismatch, we would not remove the layer from
the layer map, but remote the on-disk file.

Because we re-download the file immediately after, we simply end up with
two layer objects in memory that reference the same file in the layer
map. So, GetPage() would work fine until one of the layers gets
delete()'d. The other layer's delete() would then fail.

Future work: prevent insertion of the same layer at LayerMap level
so that we notice such bugs sooner.
2022-12-13 15:53:08 +01:00
Alexander Bayandin
feb07ed510 deploy (old): replace actions/setup-python@v4 with ansible image (#3081)
Replace actions/setup-python@v4 with the ansible image to fix
```
Version 3.10 was not found in the local cache
Error: The version '3.10' with architecture 'x64' was not found for this operating system.
```
2022-12-13 14:01:29 +00:00
Vadim Kharitonov
4603a4cbb5 Bypass SENTRY_ENVIRONMENT variable in order to filter panics in sentry
by environment.
2022-12-13 14:52:04 +01:00
Kirill Bulatov
02c1c351dc Create initial timeline without remote storage (#3077)
Removes the race during pageserver initial timeline creation that lead to partial layer uploads.
This race is only reproducible in test code, we do not create initial timelines in cloud (yet, at least), but still nice to remove the non-deterministic behavior.
2022-12-13 15:42:59 +02:00
Dmitry Ivanov
607c0facfc [proxy] Propagate more console API errors to the user
This patch aims to fix some of the inconsistencies in error reporting,
for example "Internal error" or "Console request failed" instead of
"password authentication failed for user '<NAME>'".
2022-12-13 16:16:31 +03:00
Sergey Melnikov
e5d523c86a Add new us-west-2 region (#3071) 2022-12-13 14:11:40 +01:00
Kirill Bulatov
7a16cde737 Remove useless pub trait method (#3076) 2022-12-13 12:06:20 +00:00
Arseny Sher
d6325aa79d Disable body size limit in ingress broker deploy.
We have infinite streams.
2022-12-13 13:06:30 +03:00
87 changed files with 1520 additions and 705 deletions

View File

@@ -55,6 +55,8 @@ runs:
project_id=$(echo $project | jq --raw-output '.project.id')
echo "project_id=${project_id}" >> $GITHUB_OUTPUT
echo "Project ${project_id} has been created"
env:
API_HOST: ${{ inputs.api_host }}
API_KEY: ${{ inputs.api_key }}

View File

@@ -27,6 +27,8 @@ runs:
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
echo "Project ${PROJECT_ID} has been deleted"
env:
API_HOST: ${{ inputs.api_host }}
API_KEY: ${{ inputs.api_key }}

View File

@@ -3,7 +3,6 @@
localhost_warning = False
host_key_checking = False
timeout = 30
collections_paths = ./collections
[ssh_connection]
ssh_args = -F ./ansible.ssh.cfg

View File

@@ -14,6 +14,7 @@ storage:
safekeeper_s3_prefix: neon-stress/wal
hostname_suffix: ".local"
remote_user: admin
sentry_environment: development
children:
pageservers:
hosts:

View File

@@ -16,6 +16,7 @@ storage:
ansible_aws_ssm_region: ap-southeast-1
ansible_aws_ssm_bucket_name: neon-prod-storage-ap-southeast-1
console_region_id: aws-ap-southeast-1
sentry_environment: production
children:
pageservers:

View File

@@ -16,6 +16,7 @@ storage:
ansible_aws_ssm_region: eu-central-1
ansible_aws_ssm_bucket_name: neon-prod-storage-eu-central-1
console_region_id: aws-eu-central-1
sentry_environment: production
children:
pageservers:

View File

@@ -16,6 +16,7 @@ storage:
ansible_aws_ssm_region: us-east-2
ansible_aws_ssm_bucket_name: neon-prod-storage-us-east-2
console_region_id: aws-us-east-2
sentry_environment: production
children:
pageservers:

View File

@@ -0,0 +1,37 @@
storage:
vars:
bucket_name: neon-prod-storage-us-west-2
bucket_region: us-west-2
console_mgmt_base_url: http://console-release.local
broker_endpoint: https://storage-broker.eta.us-west-2.internal.aws.neon.tech:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-west-2
ansible_aws_ssm_bucket_name: neon-prod-storage-us-west-2
console_region_id: aws-us-west-2-new
sentry_environment: production
children:
pageservers:
hosts:
pageserver-0.us-west-2.aws.neon.tech:
ansible_host: i-0d9f6dfae0e1c780d
pageserver-1.us-west-2.aws.neon.tech:
ansible_host: i-0c834be1dddba8b3f
safekeepers:
hosts:
safekeeper-0.us-west-2.aws.neon.tech:
ansible_host: i-00719d8a74986fda6
safekeeper-1.us-west-2.aws.neon.tech:
ansible_host: i-074682f9d3c712e7c
safekeeper-2.us-west-2.aws.neon.tech:
ansible_host: i-042b7efb1729d7966

View File

@@ -14,6 +14,7 @@ storage:
safekeeper_s3_prefix: prod-1/wal
hostname_suffix: ".local"
remote_user: admin
sentry_environment: production
children:
pageservers:

View File

@@ -16,6 +16,7 @@ storage:
ansible_aws_ssm_region: eu-west-1
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-west-1
console_region_id: aws-eu-west-1
sentry_environment: development
children:
pageservers:

View File

@@ -13,6 +13,7 @@ storage:
safekeeper_s3_prefix: us-stage/wal
hostname_suffix: ".local"
remote_user: admin
sentry_environment: development
children:
pageservers:

View File

@@ -16,6 +16,7 @@ storage:
ansible_aws_ssm_region: us-east-2
ansible_aws_ssm_bucket_name: neon-staging-storage-us-east-2
console_region_id: aws-us-east-2
sentry_environment: development
children:
pageservers:

View File

@@ -5,7 +5,7 @@ After=network.target auditd.service
[Service]
Type=simple
User=pageserver
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_PAGESERVER }}
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_PAGESERVER }} SENTRY_ENVIRONMENT={{ sentry_environment }}
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -D /storage/pageserver/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed

View File

@@ -5,7 +5,7 @@ After=network.target auditd.service
[Service]
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_SAFEKEEPER }}
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_SAFEKEEPER }} SENTRY_ENVIRONMENT={{ sentry_environment }}
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.eu-west-1.aws.neon.build"
sentryEnvironment: "development"
# -- Additional labels for neon-proxy pods
podLabels:
@@ -29,3 +30,28 @@ exposedService:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -10,6 +10,8 @@ ingress:
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
@@ -51,3 +53,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "development"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "link"
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
uri: "https://console.stage.neon.tech/psql_session/"
sentryEnvironment: "development"
# -- Additional labels for neon-proxy-link pods
podLabels:
@@ -37,3 +38,28 @@ exposedService:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.cloud.stage.neon.tech"
sentryEnvironment: "development"
# -- Additional labels for neon-proxy pods
podLabels:
@@ -29,3 +30,28 @@ exposedService:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
sentryEnvironment: "development"
# -- Additional labels for neon-proxy pods
podLabels:
@@ -29,3 +30,28 @@ exposedService:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -10,6 +10,8 @@ ingress:
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
@@ -51,3 +53,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "development"

View File

@@ -52,3 +52,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "development"

View File

@@ -4,6 +4,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-stress-console.local/management/api/v2"
domain: "*.stress.neon.tech"
sentryEnvironment: "development"
podLabels:
zenith_service: proxy-scram
@@ -24,3 +25,28 @@ metrics:
enabled: true
selector:
release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -4,6 +4,7 @@ settings:
authBackend: "link"
authEndpoint: "https://console.dev.neon.tech/authenticate_proxy_request/"
uri: "https://console.dev.neon.tech/psql_session/"
sentryEnvironment: "development"
# -- Additional labels for zenith-proxy pods
podLabels:
@@ -33,3 +34,28 @@ metrics:
enabled: true
selector:
release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-release.local/management/api/v2"
domain: "*.ap-southeast-1.aws.neon.tech"
sentryEnvironment: "production"
# -- Additional labels for neon-proxy pods
podLabels:
@@ -29,3 +30,28 @@ exposedService:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -10,6 +10,8 @@ ingress:
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
@@ -51,3 +53,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-release.local/management/api/v2"
domain: "*.eu-central-1.aws.neon.tech"
sentryEnvironment: "production"
# -- Additional labels for neon-proxy pods
podLabels:
@@ -29,3 +30,28 @@ exposedService:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -10,6 +10,8 @@ ingress:
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
@@ -51,3 +53,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-release.local/management/api/v2"
domain: "*.us-east-2.aws.neon.tech"
sentryEnvironment: "production"
# -- Additional labels for neon-proxy pods
podLabels:
@@ -29,3 +30,28 @@ exposedService:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -10,6 +10,8 @@ ingress:
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
@@ -51,3 +53,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-release.local/management/api/v2"
domain: "*.us-west-2.aws.neon.tech"
sentryEnvironment: "production"
# -- Additional labels for neon-proxy pods
podLabels:
@@ -29,3 +30,28 @@ exposedService:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -10,6 +10,8 @@ ingress:
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
@@ -51,3 +53,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -52,3 +52,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -2,6 +2,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-release.local/management/api/v2"
domain: "*.cloud.neon.tech"
sentryEnvironment: "production"
podLabels:
zenith_service: proxy-scram
@@ -22,3 +23,28 @@ metrics:
enabled: true
selector:
release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -2,6 +2,7 @@ settings:
authBackend: "link"
authEndpoint: "https://console.neon.tech/authenticate_proxy_request/"
uri: "https://console.neon.tech/psql_session/"
sentryEnvironment: "production"
# -- Additional labels for zenith-proxy pods
podLabels:
@@ -31,3 +32,28 @@ metrics:
enabled: true
selector:
release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -52,3 +52,5 @@ extraManifests:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "development"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.cloud.stage.neon.tech"
sentryEnvironment: "development"
# -- Additional labels for zenith-proxy pods
podLabels:
@@ -29,3 +30,28 @@ metrics:
enabled: true
selector:
release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -8,6 +8,7 @@ settings:
authBackend: "link"
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
uri: "https://console.stage.neon.tech/psql_session/"
sentryEnvironment: "development"
# -- Additional labels for zenith-proxy pods
podLabels:
@@ -29,3 +30,28 @@ metrics:
enabled: true
selector:
release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -115,13 +115,10 @@ jobs:
# neon-captest-prefetch: Same, with prefetching enabled (new project)
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-new, neon-captest-prefetch, rds-postgres ]
platform: [ neon-captest-new, neon-captest-reuse, neon-captest-prefetch, rds-postgres ]
db_size: [ 10gb ]
runner: [ us-east-2 ]
include:
- platform: neon-captest-reuse
db_size: 10gb
runner: dev # TODO: Switch to us-east-2 after dry-bonus-223539 migration to staging
- platform: neon-captest-new
db_size: 50gb
runner: us-east-2

View File

@@ -740,8 +740,7 @@ jobs:
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
STAGING='{"env_name": "staging", "proxy_job": "neon-proxy", "proxy_config": "staging.proxy", "storage_broker_ns": "neon-storage-broker", "storage_broker_config": "staging.neon-storage-broker", "kubeconfig_secret": "STAGING_KUBECONFIG_DATA", "console_api_key_secret": "NEON_STAGING_API_KEY"}'
NEON_STRESS='{"env_name": "neon-stress", "proxy_job": "neon-stress-proxy", "proxy_config": "neon-stress.proxy", "storage_broker_ns": "neon-stress-storage-broker", "storage_broker_config": "neon-stress.neon-storage-broker", "kubeconfig_secret": "NEON_STRESS_KUBECONFIG_DATA", "console_api_key_secret": "NEON_CAPTEST_API_KEY"}'
echo "include=[$STAGING, $NEON_STRESS]" >> $GITHUB_OUTPUT
echo "include=[$STAGING]" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
PRODUCTION='{"env_name": "production", "proxy_job": "neon-proxy", "proxy_config": "production.proxy", "storage_broker_ns": "neon-storage-broker", "storage_broker_config": "production.neon-storage-broker", "kubeconfig_secret": "PRODUCTION_KUBECONFIG_DATA", "console_api_key_secret": "NEON_PRODUCTION_API_KEY"}'
echo "include=[$PRODUCTION]" >> $GITHUB_OUTPUT
@@ -752,7 +751,7 @@ jobs:
deploy:
runs-on: [ self-hosted, dev, x64 ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
@@ -772,16 +771,6 @@ jobs:
submodules: true
fetch-depth: 0
- name: Setup python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Setup ansible
run: |
export PATH="/root/.local/bin:$PATH"
pip install --progress-bar off --user ansible boto3 toml
- name: Redeploy
run: |
export DOCKER_TAG=${{needs.tag.outputs.build-tag}}
@@ -802,8 +791,8 @@ jobs:
chmod 0600 ssh-key
ssh-add ssh-key
rm -f ssh-key ssh-key-cert.pub
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i ${{ matrix.env_name }}.hosts.yaml -e CONSOLE_API_TOKEN=${{ secrets[matrix.console_api_key_secret] }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
ANSIBLE_CONFIG=./ansible.cfg ansible-galaxy collection install sivel.toiletwater
ANSIBLE_CONFIG=./ansible.cfg ansible-playbook deploy.yaml -i ${{ matrix.env_name }}.hosts.yaml -e CONSOLE_API_TOKEN=${{ secrets[matrix.console_api_key_secret] }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
deploy-new:
@@ -891,7 +880,7 @@ jobs:
shell: bash
strategy:
matrix:
target_region: [ us-east-2, eu-central-1, ap-southeast-1 ]
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1 ]
steps:
- name: Checkout
uses: actions/checkout@v3

View File

@@ -51,21 +51,21 @@ pub enum InitialPidFile<'t> {
}
/// Start a background child process using the parameters given.
pub fn start_process<
F,
S: AsRef<OsStr>,
EI: IntoIterator<Item = (String, String)>, // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
>(
pub fn start_process<F, AI, A, EI>(
process_name: &str,
datadir: &Path,
command: &Path,
args: &[S],
args: AI,
envs: EI,
initial_pid_file: InitialPidFile,
process_status_check: F,
) -> anyhow::Result<Child>
where
F: Fn() -> anyhow::Result<bool>,
AI: IntoIterator<Item = A>,
A: AsRef<OsStr>,
// Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
EI: IntoIterator<Item = (String, String)>,
{
let log_path = datadir.join(format!("{process_name}.log"));
let process_log_file = fs::OpenOptions::new()

View File

@@ -341,7 +341,7 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
.get_many::<String>("pageserver-config-override")
.into_iter()
.flatten()
.map(|s| s.as_str())
.map(String::as_str)
.collect()
}

View File

@@ -1,9 +1,10 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Write};
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};
use std::process::Child;
use std::path::PathBuf;
use std::process::{Child, Command};
use std::{io, result};
use anyhow::{bail, ensure, Context};
@@ -129,6 +130,8 @@ impl PageServerNode {
overrides
}
/// Initializes a pageserver node by creating its config with the overrides provided,
/// and creating an initial tenant and timeline afterwards.
pub fn initialize(
&self,
create_tenant: Option<TenantId>,
@@ -136,11 +139,28 @@ impl PageServerNode {
config_overrides: &[&str],
pg_version: u32,
) -> anyhow::Result<TimelineId> {
// First, run `pageserver --init` and wait for it to write a config into FS and exit.
self.pageserver_init(config_overrides).with_context(|| {
format!(
"Failed to run init for pageserver node {}",
self.env.pageserver.id,
)
})?;
// Then, briefly start it fully to run HTTP commands on it,
// to create initial tenant and timeline.
// We disable the remote storage, since we stop pageserver right after the timeline creation,
// hence most of the uploads will either aborted or not started: no point to start them at all.
let disabled_remote_storage_override = "remote_storage={}";
let mut pageserver_process = self
.start_node(config_overrides, &self.env.base_data_dir, true)
.start_node(
&[disabled_remote_storage_override],
// Previous overrides will be taken from the config created before, don't overwrite them.
false,
)
.with_context(|| {
format!(
"Failed to start a process for pageserver {}",
"Failed to start a process for pageserver node {}",
self.env.pageserver.id,
)
})?;
@@ -201,55 +221,73 @@ impl PageServerNode {
}
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
self.start_node(config_overrides, &self.repo_path(), false)
self.start_node(config_overrides, false)
}
fn start_node(
&self,
config_overrides: &[&str],
datadir: &Path,
update_config: bool,
) -> anyhow::Result<Child> {
let mut overrides = self.neon_local_overrides();
overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
print!(
"Starting pageserver at '{}' in '{}'",
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
let datadir = self.repo_path();
let node_id = self.env.pageserver.id;
println!(
"Initializing pageserver node {} at '{}' in {:?}",
node_id,
self.pg_connection_config.raw_address(),
datadir.display()
datadir
);
io::stdout().flush()?;
let mut args = vec![
"-D",
datadir.to_str().with_context(|| {
format!("Datadir path {datadir:?} cannot be represented as a unicode string")
})?,
];
let datadir_path_str = datadir.to_str().with_context(|| {
format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}")
})?;
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
args.push(Cow::Borrowed("--init"));
let init_output = Command::new(&self.env.pageserver_bin())
.args(args.iter().map(Cow::as_ref))
.envs(self.pageserver_env_variables()?)
.output()
.with_context(|| format!("Failed to run pageserver init for node {node_id}"))?;
anyhow::ensure!(
init_output.status.success(),
"Pageserver init for node {} did not finish successfully, stdout: {}, stderr: {}",
node_id,
String::from_utf8_lossy(&init_output.stdout),
String::from_utf8_lossy(&init_output.stderr),
);
Ok(())
}
fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
let mut overrides = self.neon_local_overrides();
overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
let datadir = self.repo_path();
print!(
"Starting pageserver node {} at '{}' in {:?}",
self.env.pageserver.id,
self.pg_connection_config.raw_address(),
datadir
);
io::stdout().flush()?;
let datadir_path_str = datadir.to_str().with_context(|| {
format!(
"Cannot start pageserver node {} in path that has no string representation: {:?}",
self.env.pageserver.id, datadir,
)
})?;
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
if update_config {
args.push("--update-config");
args.push(Cow::Borrowed("--update-config"));
}
for config_override in &overrides {
args.extend(["-c", config_override]);
}
let envs = if self.env.pageserver.auth_type != AuthType::Trust {
// Generate a token to connect from the pageserver to a safekeeper
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
vec![("ZENITH_AUTH_TOKEN".to_owned(), token)]
} else {
vec![]
};
background_process::start_process(
"pageserver",
datadir,
&datadir,
&self.env.pageserver_bin(),
&args,
envs,
args.iter().map(Cow::as_ref),
self.pageserver_env_variables()?,
background_process::InitialPidFile::Expect(&self.pid_file()),
|| match self.check_status() {
Ok(()) => Ok(true),
@@ -259,6 +297,35 @@ impl PageServerNode {
)
}
fn pageserver_basic_args<'a>(
&self,
config_overrides: &'a [&'a str],
datadir_path_str: &'a str,
) -> Vec<Cow<'a, str>> {
let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)];
let mut overrides = self.neon_local_overrides();
overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
for config_override in overrides {
args.push(Cow::Borrowed("-c"));
args.push(Cow::Owned(config_override));
}
args
}
fn pageserver_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
Ok(if self.env.pageserver.auth_type != AuthType::Trust {
// Generate a token to connect from the pageserver to a safekeeper
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
vec![("ZENITH_AUTH_TOKEN".to_owned(), token)]
} else {
Vec::new()
})
}
///
/// Stop the server.
///

View File

@@ -1,201 +0,0 @@
# Context
In Neon, one host runs a lot of compute nodes. Most of the compute
nodes are fairly small, 1-2 vCPUs and 1-2 GB of memory. But some
compute nodes can be larger, and we also want to autoscale from small
to large, and back again, without shutting down the compute.
+----------------------------+
| Host |
| +------------------------+ |
| | Container VM | | +------------+
| | +--------------------+ | | | |
| | | Postgres | | | | Pageserver |
| | | | | | | |
| | | ################ | | | | |
| | | # shared_ # | | | +------------+
| | | # buffers # | | |
| | | ################ | | |
| | | | | |
| | +--------------------+ | |
| +------------------------+ |
+----------------------------+
Each Postgres runs in a qemu VM, which in turn runs in a kubernetes
pod. Kubernetes manages the placement of these Postgres compute nodes
to hosts.
# Problem
PostgreSQL normally relies heavily on the kernel page cache for
performance. PostgreSQL has its own buffer cache, configured by the
shared_buffers setting, but the usual advice is to set shared_buffers
to around 10-20% of the overall system RAM available, leaving the rest
for the kernel page cache. However with Neon, the I/O operations don't
go through the kernel filesystem layer, so we bypass the kernel page
cache and rely solely on the Postgres shared buffer cache for caching
in the compute node.
Because we don't make use of the kernel page cache, we have to either
set shared_buffers larger than you would with normal PostgreSQL, or
you send a lot more I/O requests to the pageserver than you otherwise
would. However in PostgreSQL, shared_buffers setting cannot be changed
while the server is running.
Furthermore, we have fast local SSDs available in the compute hosts
that we could also utilize for caching.
# Solution 1: Scale shared buffers
This solution consists of:
- Core PostgreSQL changes to allow changing shared_buffers on the fly
- New code to orchestrate changing the memory size of the VM, and tell
PostgreSQL to change the shared_buffers setting accordingly.
- New code to Postgres that measures current shared buffer cache usage
to determine what the "cache pressure" is, i.e. how useful it would
be to have a larger shared buffer cache. This could be in an
extension.
- A new governor in the host that chooses which VM to allocate
how much memory.
Picture:
+----------------------------+
| Host |
| +------------------------+ |
| | Container VM | | +------------+
| | +--------------------+ | | | |
| | | Postgres | | | | Pageserver |
| | | | | | | |
| | | ################ | | | | |
| | | # shared_ # | | | +------------+
| | | # buffers # | | |
| | | # # | | |
| | | . | . | | |
| | | . | . | | |
| | | . V . | | |
| | | ################ | | |
| | | | | |
| | +--------------------+ | |
| +------------------------+ |
+----------------------------+
Pros:
- best possible performance for the cached data
Cons:
- Scales only memory, cannot take advantage of local SSDs in host machine
- Needs explicit operations to scale. Won't dynamicaly share resources
between tenants, we'll need to start a resizing process to change
the allocations.
- Needs patches to core PostgreSQL
# Alternative 2: Local filesystem cache
Add code to Postgres Neon extension to use a local file on disk for
caching. When a page is evicted from Postgres buffer cache, write it
to the local file, and read it back if it's requested again. Rely on
kernel page cache to keep the most hot part of that file in memory.
Like in solution 1, need a governor in the host to allocate the local
disk for each VM, and orchestration to scale it up and down.
+----------------------------+
| Host |
| +------------------------+ |
| | Container VM | | +------------+
| | +--------------------+ | | | |
| | | Postgres | | | | Pageserver |
| | | | | | | |
| | | ################ | | | | |
| | | # shared_ # | | | +------------+
| | | # buffers # | | |
| | | ################ | | |
| | | | | |
| | | ################ | | |
| | | # Local FS # | | |
| | | # cache # | | |
| | | # # | | |
| | | ################ | | |
| | | | | |
| | +--------------------+ | |
| +------------------------+ |
+----------------------------+
Pros:
- No PostgreSQL core changes required
- Automatically takes advantage of local SSDs, not just memory
Cons:
- Needs explicit operations to change the size of the cache file
- Need support for live migration of the filesystem
Question:
How is the page cache shared between the host kernel and the VMs? Does
each VM maintain their own page cache? I think that depends on the
filesystem and qemu driver we choose. If we use a raw block device and
let the VM manage it, I believe the VM will maintain the page
cache. But if we use a driver to access the host filesystem directly,
or use something like NFS, I'm not sure.
# Alternative 3: Host cache
Implement a new host cache process that intercepts all the I/O
requests from all VMs running on the host, and manages a local cache.
Postgres can communicate with the host cache using TCP, or via custom
virtio driver.
+----------------------------+
| Host |
| |
| ###################### |
| # # |
| # shared host cache # |
| # # |
| # # |
| # # |
| # # |
| ###################### |
| |
| +------------------------+ |
| | Container VM | | +------------+
| | +--------------------+ | | | |
| | | Postgres | | | | Pageserver |
| | | | | | | |
| | | ################ | | | | |
| | | # shared_ # | | | +------------+
| | | # buffers # | | |
| | | ################ | | |
| | | | | |
| | +--------------------+ | |
| +------------------------+ |
+----------------------------+
Pros:
- dynamic sharing between all tenants (one cache and eviction policy for all)
- No PostgreSQL core changes required
- Takes advantage of local SSDs, not just memory
Cons:
- Whole new component to write
One way to achieve this would be to collocate the pageserver on the
host itself. That would eliminate the network roundtrip between
Postgres and the pageserver, effectively making the pageserver itself
be the host shared cache.

View File

@@ -117,6 +117,7 @@ impl TenantCreateRequest {
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TenantConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]

View File

@@ -444,7 +444,7 @@ impl FeCloseMessage {
pub enum BeMessage<'a> {
AuthenticationOk,
AuthenticationMD5Password([u8; 4]),
AuthenticationSasl(BeAuthenticationSaslMessage<'a>),
AuthenticationSasl(SaslMessage<'a>),
AuthenticationCleartextPassword,
BackendKeyData(CancelKeyData),
BindComplete,
@@ -463,7 +463,7 @@ pub enum BeMessage<'a> {
EncryptionResponse(bool),
NoData,
ParameterDescription,
ParameterStatus(BeParameterStatusMessage<'a>),
ParameterStatus(ParameterStatusMessage<'a>),
ParseComplete,
ReadyForQuery,
RowDescription(&'a [RowDescriptor<'a>]),
@@ -473,19 +473,19 @@ pub enum BeMessage<'a> {
}
#[derive(Debug)]
pub enum BeAuthenticationSaslMessage<'a> {
pub enum SaslMessage<'a> {
Methods(&'a [&'a str]),
Continue(&'a [u8]),
Final(&'a [u8]),
}
#[derive(Debug)]
pub enum BeParameterStatusMessage<'a> {
pub enum ParameterStatusMessage<'a> {
Encoding(&'a str),
ServerVersion(&'a str),
}
impl BeParameterStatusMessage<'static> {
impl ParameterStatusMessage<'static> {
pub fn encoding() -> BeMessage<'static> {
BeMessage::ParameterStatus(Self::Encoding("UTF8"))
}
@@ -639,7 +639,7 @@ impl<'a> BeMessage<'a> {
BeMessage::AuthenticationSasl(msg) => {
buf.put_u8(b'R');
write_body(buf, |buf| {
use BeAuthenticationSaslMessage::*;
use SaslMessage::*;
match msg {
Methods(methods) => {
buf.put_i32(10); // Specifies that SASL auth method is used.
@@ -801,7 +801,7 @@ impl<'a> BeMessage<'a> {
BeMessage::ParameterStatus(param) => {
use std::io::{IoSlice, Write};
use BeParameterStatusMessage::*;
use ParameterStatusMessage::*;
let [name, value] = match param {
Encoding(name) => [b"client_encoding", name.as_bytes()],

View File

@@ -104,11 +104,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
) -> Result<Download, DownloadError>;
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
/// Downcast to LocalFs implementation. For tests.
fn as_local(&self) -> Option<&LocalFs> {
None
}
}
pub struct Download {
@@ -277,7 +272,7 @@ impl Debug for S3Config {
}
impl RemoteStorageConfig {
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let local_path = toml.get("local_path");
let bucket_name = toml.get("bucket_name");
let bucket_region = toml.get("bucket_region");
@@ -301,7 +296,8 @@ impl RemoteStorageConfig {
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
let storage = match (local_path, bucket_name, bucket_region) {
(None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"),
// no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
(None, None, None) => return Ok(None),
(_, Some(_), None) => {
bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
}
@@ -327,11 +323,11 @@ impl RemoteStorageConfig {
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
};
Ok(RemoteStorageConfig {
Ok(Some(RemoteStorageConfig {
max_concurrent_syncs,
max_sync_errors,
storage,
})
}))
}
}

View File

@@ -283,10 +283,6 @@ impl RemoteStorage for LocalFs {
bail!("File {file_path:?} either does not exist or is not a file")
}
}
fn as_local(&self) -> Option<&LocalFs> {
Some(self)
}
}
fn storage_metadata_path(original_path: &Path) -> PathBuf {

View File

@@ -6,7 +6,7 @@
use crate::sock_split::{BidiStream, ReadStream, WriteStream};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ParameterStatusMessage};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::fmt;
@@ -361,10 +361,10 @@ impl PostgresBackend {
match self.auth_type {
AuthType::Trust => {
self.write_message_noflush(&BeMessage::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message_noflush(&ParameterStatusMessage::encoding())?
// The async python driver requires a valid server_version
.write_message_noflush(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion("14.1"),
ParameterStatusMessage::ServerVersion("14.1"),
))?
.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
@@ -413,7 +413,7 @@ impl PostgresBackend {
}
}
self.write_message_noflush(&BeMessage::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message_noflush(&ParameterStatusMessage::encoding())?
.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}

View File

@@ -6,7 +6,7 @@
use crate::postgres_backend::AuthType;
use anyhow::{bail, Context, Result};
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ParameterStatusMessage};
use rand::Rng;
use std::future::Future;
use std::net::SocketAddr;
@@ -331,10 +331,10 @@ impl PostgresBackend {
match self.auth_type {
AuthType::Trust => {
self.write_message(&BeMessage::AuthenticationOk)?
.write_message(&BeParameterStatusMessage::encoding())?
.write_message(&ParameterStatusMessage::encoding())?
// The async python driver requires a valid server_version
.write_message(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion("14.1"),
ParameterStatusMessage::ServerVersion("14.1"),
))?
.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
@@ -384,7 +384,7 @@ impl PostgresBackend {
}
}
self.write_message(&BeMessage::AuthenticationOk)?
.write_message(&BeParameterStatusMessage::encoding())?
.write_message(&ParameterStatusMessage::encoding())?
.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}

View File

@@ -10,11 +10,13 @@ pub fn init_sentry(
extra_options: &[(&str, &str)],
) -> Option<ClientInitGuard> {
let dsn = env::var("SENTRY_DSN").ok()?;
let environment = env::var("SENTRY_ENVIRONMENT").unwrap_or_else(|_| "development".into());
let guard = sentry::init((
dsn,
sentry::ClientOptions {
release: release_name,
environment: Some(environment.into()),
..Default::default()
},
));

View File

@@ -524,7 +524,7 @@ impl PageServerConf {
)),
"auth_type" => builder.auth_type(parse_toml_from_str(key, item)?),
"remote_storage" => {
builder.remote_storage_config(Some(RemoteStorageConfig::from_toml(item)?))
builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
}
"tenant_config" => {
t_conf = Self::parse_toml_tenant_conf(item)?;

View File

@@ -96,6 +96,16 @@ static CURRENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static REMOTE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_remote_physical_size",
"The size of the layer files present in the remote storage that are listed in the the remote index_part.json.",
// Corollary: If any files are missing from the index part, they won't be included here.
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_current_logical_size",
@@ -201,7 +211,7 @@ pub static NUM_ONDISK_LAYERS: Lazy<IntGauge> = Lazy::new(|| {
// remote storage metrics
pub static REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS: Lazy<IntGaugeVec> = Lazy::new(|| {
static REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_remote_upload_queue_unfinished_tasks",
"Number of tasks in the upload queue that are not finished yet.",
@@ -210,14 +220,14 @@ pub static REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS: Lazy<IntGaugeVec> = Lazy::new(|
.expect("failed to define a metric")
});
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,
Download,
Delete,
}
impl RemoteOpKind {
pub fn as_str(&self) -> &str {
pub fn as_str(&self) -> &'static str {
match self {
Self::Upload => "upload",
Self::Download => "download",
@@ -226,13 +236,13 @@ impl RemoteOpKind {
}
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RemoteOpFileKind {
Layer,
Index,
}
impl RemoteOpFileKind {
pub fn as_str(&self) -> &str {
pub fn as_str(&self) -> &'static str {
match self {
Self::Layer => "layer",
Self::Index => "index",
@@ -491,10 +501,114 @@ pub fn remove_tenant_metrics(tenant_id: &TenantId) {
use futures::Future;
use pin_project_lite::pin_project;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Instant;
pub struct RemoteTimelineClientMetrics {
tenant_id: String,
timeline_id: String,
remote_physical_size_gauge: Mutex<Option<UIntGauge>>,
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
unfinished_tasks: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
}
impl RemoteTimelineClientMetrics {
pub fn new(tenant_id: &TenantId, timeline_id: &TimelineId) -> Self {
RemoteTimelineClientMetrics {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
remote_operation_time: Mutex::new(HashMap::default()),
unfinished_tasks: Mutex::new(HashMap::default()),
remote_physical_size_gauge: Mutex::new(None),
}
}
pub fn remote_physical_size_gauge(&self) -> UIntGauge {
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
guard
.get_or_insert_with(|| {
REMOTE_PHYSICAL_SIZE
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
])
.unwrap()
})
.clone()
}
pub fn remote_operation_time(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
status: &'static str,
) -> Histogram {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.remote_operation_time.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str(), status);
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_OPERATION_TIME
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
key.2,
])
.unwrap()
});
metric.clone()
}
pub fn unfinished_tasks(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntGauge {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.unfinished_tasks.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
])
.unwrap()
});
metric.clone()
}
}
impl Drop for RemoteTimelineClientMetrics {
fn drop(&mut self) {
let RemoteTimelineClientMetrics {
tenant_id,
timeline_id,
remote_physical_size_gauge,
remote_operation_time,
unfinished_tasks,
} = self;
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
}
for ((a, b), _) in unfinished_tasks.get_mut().unwrap().drain() {
let _ = REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS.remove_label_values(&[
tenant_id,
timeline_id,
a,
b,
]);
}
{
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
}
}
}
/// Wrapper future that measures the time spent by a remote storage operation,
/// and records the time and success/failure as a prometheus metric.
pub trait MeasureRemoteOp: Sized {
@@ -504,6 +618,7 @@ pub trait MeasureRemoteOp: Sized {
timeline_id: TimelineId,
file_kind: RemoteOpFileKind,
op: RemoteOpKind,
metrics: Arc<RemoteTimelineClientMetrics>,
) -> MeasuredRemoteOp<Self> {
let start = Instant::now();
MeasuredRemoteOp {
@@ -513,6 +628,7 @@ pub trait MeasureRemoteOp: Sized {
file_kind,
op,
start,
metrics,
}
}
}
@@ -529,6 +645,7 @@ pin_project! {
file_kind: RemoteOpFileKind,
op: RemoteOpKind,
start: Instant,
metrics: Arc<RemoteTimelineClientMetrics>,
}
}
@@ -541,15 +658,8 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
if let Poll::Ready(ref res) = poll_result {
let duration = this.start.elapsed();
let status = if res.is_ok() { &"success" } else { &"failure" };
REMOTE_OPERATION_TIME
.get_metric_with_label_values(&[
&this.tenant_id.to_string(),
&this.timeline_id.to_string(),
this.file_kind.as_str(),
this.op.as_str(),
status,
])
.unwrap()
this.metrics
.remote_operation_time(this.file_kind, this.op, status)
.observe(duration.as_secs_f64());
}
poll_result

View File

@@ -210,10 +210,9 @@ use utils::lsn::Lsn;
use self::index::IndexPart;
use crate::metrics::MeasureRemoteOp;
use crate::metrics::RemoteOpFileKind;
use crate::metrics::RemoteOpKind;
use crate::metrics::REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS;
use crate::metrics::{MeasureRemoteOp, RemoteTimelineClientMetrics};
use crate::tenant::filename::LayerFileName;
use crate::{
config::PageServerConf,
@@ -256,6 +255,8 @@ pub struct RemoteTimelineClient {
upload_queue: Mutex<UploadQueue>,
metrics: Arc<RemoteTimelineClientMetrics>,
storage_impl: GenericRemoteStorage,
}
@@ -459,6 +460,7 @@ impl RemoteTimelineClient {
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
self.update_remote_physical_size_gauge(Some(index_part));
Ok(())
}
@@ -470,6 +472,7 @@ impl RemoteTimelineClient {
) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata)?;
self.update_remote_physical_size_gauge(None);
Ok(())
}
@@ -481,6 +484,20 @@ impl RemoteTimelineClient {
}
}
fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
current_remote_index_part
.layer_metadata
.iter()
// If we don't have the file size for the layer, don't account for it in the metric.
.map(|(_, ilmd)| ilmd.file_size.unwrap_or(0))
.sum()
} else {
0
};
self.metrics.remote_physical_size_gauge().set(size);
}
//
// Download operations.
//
@@ -501,6 +518,7 @@ impl RemoteTimelineClient {
self.timeline_id,
RemoteOpFileKind::Index,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
)
.await
}
@@ -528,6 +546,7 @@ impl RemoteTimelineClient {
self.timeline_id,
RemoteOpFileKind::Layer,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
)
.await?;
@@ -540,6 +559,14 @@ impl RemoteTimelineClient {
let upload_queue = guard.initialized_mut()?;
if let Some(upgraded) = upload_queue.latest_files.get_mut(layer_file_name) {
upgraded.merge(&new_metadata);
// If we don't do an index file upload inbetween here and restart,
// the value will go back down after pageserver restart, since we will
// have lost this data point.
// But, we upload index part fairly frequently, and restart pageserver rarely.
// So, by accounting eagerly, we present a most-of-the-time-more-accurate value sooner.
self.metrics
.remote_physical_size_gauge()
.add(downloaded_size);
} else {
// The file should exist, since we just downloaded it.
warn!(
@@ -847,11 +874,12 @@ impl RemoteTimelineClient {
self.timeline_id,
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
Arc::clone(&self.metrics),
)
.await
}
UploadOp::UploadMetadata(ref index_part, _lsn) => {
upload::upload_index_part(
let res = upload::upload_index_part(
self.conf,
&self.storage_impl,
self.tenant_id,
@@ -863,8 +891,13 @@ impl RemoteTimelineClient {
self.timeline_id,
RemoteOpFileKind::Index,
RemoteOpKind::Upload,
Arc::clone(&self.metrics),
)
.await
.await;
if res.is_ok() {
self.update_remote_physical_size_gauge(Some(index_part));
}
res
}
UploadOp::Delete(metric_file_kind, ref layer_file_name) => {
let path = &self
@@ -877,6 +910,7 @@ impl RemoteTimelineClient {
self.timeline_id,
*metric_file_kind,
RemoteOpKind::Delete,
Arc::clone(&self.metrics),
)
.await
}
@@ -977,14 +1011,8 @@ impl RemoteTimelineClient {
return;
}
};
REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
file_kind.as_str(),
op_kind.as_str(),
])
.unwrap()
self.metrics
.unfinished_tasks(&file_kind, &op_kind)
.add(delta)
}
@@ -1068,6 +1096,7 @@ pub fn create_remote_timeline_client(
timeline_id,
storage_impl: remote_storage,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
})
}
@@ -1180,6 +1209,10 @@ mod tests {
timeline_id: TIMELINE_ID,
storage_impl,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&harness.tenant_id,
&TIMELINE_ID,
)),
});
let remote_timeline_dir =

View File

@@ -135,7 +135,7 @@ impl<'de> serde::de::Visitor<'de> for UncleanLayerFileNameVisitor {
match maybe_clean {
Ok(clean) => Ok(UncleanLayerFileName::Clean(clean)),
Err(e) => {
if v.ends_with(".old") {
if v.ends_with(".old") || v == "metadata_backup" {
Ok(UncleanLayerFileName::BackupFile(v.to_owned()))
} else {
Err(E::custom(e))
@@ -232,7 +232,7 @@ impl IndexPart {
/// Serialized form of [`LayerFileMetadata`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
pub struct IndexLayerMetadata {
file_size: Option<u64>,
pub(super) file_size: Option<u64>,
}
impl From<&'_ LayerFileMetadata> for IndexLayerMetadata {

View File

@@ -1690,7 +1690,7 @@ impl Tenant {
let _enter = info_span!("saving tenantconf").entered();
info!("persisting tenantconf to {}", target_config_path.display());
// TODO this will prepend comments endlessly
// TODO this will prepend comments endlessly ?
let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.
@@ -1703,7 +1703,10 @@ impl Tenant {
let mut target_config_file = VirtualFile::open_with_options(
target_config_path,
OpenOptions::new().write(true).create_new(first_save),
OpenOptions::new()
.truncate(true) // This needed for overwriting with small config files
.write(true)
.create_new(first_save),
)?;
target_config_file

View File

@@ -1013,9 +1013,8 @@ impl Timeline {
// 1) if there was another pageserver that came and generated new files
// 2) during attach of a timeline with big history which we currently do not do
let mut local_only_layers = local_layers;
let timeline_dir = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for remote_layer_name in &index_part.timeline_layers {
local_only_layers.remove(remote_layer_name);
let local_layer = local_only_layers.remove(remote_layer_name);
let remote_layer_metadata = index_part
.layer_metadata
@@ -1023,41 +1022,57 @@ impl Timeline {
.map(LayerFileMetadata::from)
.unwrap_or(LayerFileMetadata::MISSING);
let local_layer_path = timeline_dir.join(remote_layer_name.file_name());
// Is the local layer's size different from the size stored in the
// remote index file? If so, rename_to_backup those files & remove
// local_layer form the layer map.
// We'll download a fresh copy of the layer file below.
if let Some(local_layer) = local_layer {
let local_layer_path = local_layer.local_path();
ensure!(
local_layer_path.exists(),
"every layer from local_layers must exist on disk: {}",
local_layer_path.display()
);
if local_layer_path.exists() {
let mut already_downloaded = true;
// Are there any local files that exist, with a size that doesn't match
// with the size stored in the remote index file?
// If so, rename_to_backup those files so that we re-download them later.
if let Some(remote_size) = remote_layer_metadata.file_size() {
match local_layer_path.metadata() {
Ok(metadata) => {
let local_size = metadata.len();
if local_size != remote_size {
warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
if let Err(err) = rename_to_backup(&local_layer_path) {
error!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.current_physical_size_gauge.sub(local_size);
already_downloaded = false;
}
}
}
Err(err) => {
error!("could not get size of local file {local_layer_path:?}: {err:?}")
let metadata = local_layer_path.metadata().with_context(|| {
format!(
"get file size of local layer {}",
local_layer_path.display()
)
})?;
let local_size = metadata.len();
if local_size != remote_size {
warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
if let Err(err) = rename_to_backup(&local_layer_path) {
assert!(local_layer_path.exists(), "we would leave the local_layer without a file if this does not hold: {}", local_layer_path.display());
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.current_physical_size_gauge.sub(local_size);
self.layers.write().unwrap().remove_historic(local_layer);
// fall-through to adding the remote layer
}
} else {
debug!(
"layer is present locally and file size matches remote, using it: {}",
local_layer_path.display()
);
continue;
}
}
if already_downloaded {
} else {
debug!(
"layer is present locally and remote does not have file size, using it: {}",
local_layer_path.display()
);
continue;
}
} else {
info!("remote layer {remote_layer_name:?} does not exist locally");
}
info!(
"remote layer does not exist locally, downloading it now: {}",
remote_layer_name.file_name()
);
match remote_layer_name {
LayerFileName::Image(imgfilename) => {
if imgfilename.lsn > up_to_date_disk_consistent_lsn {

View File

@@ -126,15 +126,21 @@ impl<E: Clone> TaskHandle<E> {
match self.events_receiver.changed().await {
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
Err(_task_channel_part_dropped) => {
TaskEvent::End(match self.join_handle.take() {
TaskEvent::End(match self.join_handle.as_mut() {
Some(jh) => {
if !jh.is_finished() {
warn!("sender is dropped while join handle is still alive");
}
jh.await
let res = jh
.await
.map_err(|e| anyhow::anyhow!("Failed to join task: {e}"))
.and_then(|x| x)
.and_then(|x| x);
// For cancellation-safety, drop join_handle only after successful .await.
self.join_handle = None;
res
}
None => {
// Another option is to have an enum, join handle or result and give away the reference to it

View File

@@ -49,6 +49,9 @@ pub enum AuthErrorImpl {
)]
MissingProjectName,
#[error("password authentication failed for user '{0}'")]
AuthFailed(Box<str>),
/// Errors produced by e.g. [`crate::stream::PqStream`].
#[error(transparent)]
Io(#[from] io::Error),
@@ -62,6 +65,10 @@ impl AuthError {
pub fn bad_auth_method(name: impl Into<Box<str>>) -> Self {
AuthErrorImpl::BadAuthMethod(name.into()).into()
}
pub fn auth_failed(user: impl Into<Box<str>>) -> Self {
AuthErrorImpl::AuthFailed(user.into()).into()
}
}
impl<E: Into<AuthErrorImpl>> From<E> for AuthError {
@@ -78,10 +85,11 @@ impl UserFacingError for AuthError {
GetAuthInfo(e) => e.to_string_client(),
WakeCompute(e) => e.to_string_client(),
Sasl(e) => e.to_string_client(),
AuthFailed(_) => self.to_string(),
BadAuthMethod(_) => self.to_string(),
MalformedPassword(_) => self.to_string(),
MissingProjectName => self.to_string(),
_ => "Internal error".to_string(),
Io(_) => "Internal error".to_string(),
}
}
}

View File

@@ -5,26 +5,74 @@ use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute,
error::{io_error, UserFacingError},
http, scram,
http, sasl, scram,
stream::PqStream,
};
use futures::TryFutureExt;
use serde::{Deserialize, Serialize};
use reqwest::StatusCode as HttpStatusCode;
use serde::Deserialize;
use std::future::Future;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, info_span};
use tracing::{error, info, info_span, warn, Instrument};
/// A go-to error message which doesn't leak any detail.
const REQUEST_FAILED: &str = "Console request failed";
/// Common console API error.
#[derive(Debug, Error)]
#[error("{}", REQUEST_FAILED)]
pub struct TransportError(#[from] std::io::Error);
pub enum ApiError {
/// Error returned by the console itself.
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
Console {
status: HttpStatusCode,
text: Box<str>,
},
impl UserFacingError for TransportError {}
/// Various IO errors like broken pipe or malformed payload.
#[error("{REQUEST_FAILED}: {0}")]
Transport(#[from] std::io::Error),
}
impl ApiError {
/// Returns HTTP status code if it's the reason for failure.
fn http_status_code(&self) -> Option<HttpStatusCode> {
use ApiError::*;
match self {
Console { status, .. } => Some(*status),
_ => None,
}
}
}
impl UserFacingError for ApiError {
fn to_string_client(&self) -> String {
use ApiError::*;
match self {
// To minimize risks, only select errors are forwarded to users.
// Ask @neondatabase/control-plane for review before adding more.
Console { status, .. } => match *status {
HttpStatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
HttpStatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
HttpStatusCode::LOCKED => {
// Status 423: project might be in maintenance mode (or bad state).
format!("{REQUEST_FAILED}: endpoint is temporary unavailable")
}
_ => REQUEST_FAILED.to_owned(),
},
_ => REQUEST_FAILED.to_owned(),
}
}
}
// Helps eliminate graceless `.map_err` calls without introducing another ctor.
impl From<reqwest::Error> for TransportError {
impl From<reqwest::Error> for ApiError {
fn from(e: reqwest::Error) -> Self {
io_error(e).into()
}
@@ -37,61 +85,73 @@ pub enum GetAuthInfoError {
BadSecret,
#[error(transparent)]
Transport(TransportError),
ApiError(ApiError),
}
// This allows more useful interactions than `#[from]`.
impl<E: Into<ApiError>> From<E> for GetAuthInfoError {
fn from(e: E) -> Self {
Self::ApiError(e.into())
}
}
impl UserFacingError for GetAuthInfoError {
fn to_string_client(&self) -> String {
use GetAuthInfoError::*;
match self {
// We absolutely should not leak any secrets!
BadSecret => REQUEST_FAILED.to_owned(),
Transport(e) => e.to_string_client(),
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
}
}
}
impl<E: Into<TransportError>> From<E> for GetAuthInfoError {
fn from(e: E) -> Self {
Self::Transport(e.into())
}
}
#[derive(Debug, Error)]
pub enum WakeComputeError {
// We shouldn't show users the address even if it's broken.
#[error("Console responded with a malformed compute address: {0}")]
BadComputeAddress(String),
BadComputeAddress(Box<str>),
#[error(transparent)]
Transport(TransportError),
ApiError(ApiError),
}
// This allows more useful interactions than `#[from]`.
impl<E: Into<ApiError>> From<E> for WakeComputeError {
fn from(e: E) -> Self {
Self::ApiError(e.into())
}
}
impl UserFacingError for WakeComputeError {
fn to_string_client(&self) -> String {
use WakeComputeError::*;
match self {
// We shouldn't show user the address even if it's broken.
// Besides, user is unlikely to care about this detail.
BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
Transport(e) => e.to_string_client(),
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
}
}
}
impl<E: Into<TransportError>> From<E> for WakeComputeError {
fn from(e: E) -> Self {
Self::Transport(e.into())
}
/// Console's response which holds client's auth secret.
#[derive(Deserialize, Debug)]
struct GetRoleSecret {
role_secret: Box<str>,
}
// TODO: convert into an enum with "error"
#[derive(Serialize, Deserialize, Debug)]
struct GetRoleSecretResponse {
role_secret: String,
/// Console's response which holds compute node's `host:port` pair.
#[derive(Deserialize, Debug)]
struct WakeCompute {
address: Box<str>,
}
// TODO: convert into an enum with "error"
#[derive(Serialize, Deserialize, Debug)]
struct GetWakeComputeResponse {
address: String,
/// Console's error response with human-readable description.
#[derive(Deserialize, Debug)]
struct ConsoleError {
error: Box<str>,
}
/// Auth secret which is managed by the cloud.
@@ -110,6 +170,12 @@ pub(super) struct Api<'a> {
creds: &'a ClientCredentials<'a>,
}
impl<'a> AsRef<ClientCredentials<'a>> for Api<'a> {
fn as_ref(&self) -> &ClientCredentials<'a> {
self.creds
}
}
impl<'a> Api<'a> {
/// Construct an API object containing the auth parameters.
pub(super) fn new(
@@ -126,83 +192,88 @@ impl<'a> Api<'a> {
/// Authenticate the existing user or throw an error.
pub(super) async fn handle_user(
self,
&'a self,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
) -> auth::Result<AuthSuccess<compute::ConnCfg>> {
handle_user(client, &self, Self::get_auth_info, Self::wake_compute).await
handle_user(client, self, Self::get_auth_info, Self::wake_compute).await
}
}
async fn get_auth_info(&self) -> Result<AuthInfo, GetAuthInfoError> {
impl Api<'_> {
async fn get_auth_info(&self) -> Result<Option<AuthInfo>, GetAuthInfoError> {
let request_id = uuid::Uuid::new_v4().to_string();
let req = self
.endpoint
.get("proxy_get_role_secret")
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
("project", Some(self.creds.project().expect("impossible"))),
("role", Some(self.creds.user)),
])
.build()?;
async {
let request = self
.endpoint
.get("proxy_get_role_secret")
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
("project", Some(self.creds.project().expect("impossible"))),
("role", Some(self.creds.user)),
])
.build()?;
let span = info_span!("http", id = request_id, url = req.url().as_str());
info!(parent: &span, "request auth info");
let msg = self
.endpoint
.checked_execute(req)
.and_then(|r| r.json::<GetRoleSecretResponse>())
.await
.map_err(|e| {
error!(parent: &span, "{e}");
e
})?;
info!(url = request.url().as_str(), "sending http request");
let response = self.endpoint.execute(request).await?;
let body = match parse_body::<GetRoleSecret>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
Err(e) => match e.http_status_code() {
Some(HttpStatusCode::NOT_FOUND) => return Ok(None),
_otherwise => return Err(e.into()),
},
};
scram::ServerSecret::parse(&msg.role_secret)
.map(AuthInfo::Scram)
.ok_or(GetAuthInfoError::BadSecret)
let secret = scram::ServerSecret::parse(&body.role_secret)
.map(AuthInfo::Scram)
.ok_or(GetAuthInfoError::BadSecret)?;
Ok(Some(secret))
}
.map_err(crate::error::log_error)
.instrument(info_span!("get_auth_info", id = request_id))
.await
}
/// Wake up the compute node and return the corresponding connection info.
pub(super) async fn wake_compute(&self) -> Result<compute::ConnCfg, WakeComputeError> {
pub async fn wake_compute(&self) -> Result<compute::ConnCfg, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
let req = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
("project", Some(self.creds.project().expect("impossible"))),
])
.build()?;
async {
let request = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
("project", Some(self.creds.project().expect("impossible"))),
])
.build()?;
let span = info_span!("http", id = request_id, url = req.url().as_str());
info!(parent: &span, "request wake-up");
let msg = self
.endpoint
.checked_execute(req)
.and_then(|r| r.json::<GetWakeComputeResponse>())
.await
.map_err(|e| {
error!(parent: &span, "{e}");
e
})?;
info!(url = request.url().as_str(), "sending http request");
let response = self.endpoint.execute(request).await?;
let body = parse_body::<WakeCompute>(response).await?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&msg.address) {
None => return Err(WakeComputeError::BadComputeAddress(msg.address)),
Some(x) => x,
};
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&body.address) {
None => return Err(WakeComputeError::BadComputeAddress(body.address)),
Some(x) => x,
};
let mut config = compute::ConnCfg::new();
config
.host(host)
.port(port)
.dbname(self.creds.dbname)
.user(self.creds.user);
let mut config = compute::ConnCfg::new();
config
.host(host)
.port(port)
.dbname(self.creds.dbname)
.user(self.creds.user);
Ok(config)
Ok(config)
}
.map_err(crate::error::log_error)
.instrument(info_span!("wake_compute", id = request_id))
.await
}
}
@@ -215,24 +286,40 @@ pub(super) async fn handle_user<'a, Endpoint, GetAuthInfo, WakeCompute>(
wake_compute: impl FnOnce(&'a Endpoint) -> WakeCompute,
) -> auth::Result<AuthSuccess<compute::ConnCfg>>
where
GetAuthInfo: Future<Output = Result<AuthInfo, GetAuthInfoError>>,
Endpoint: AsRef<ClientCredentials<'a>>,
GetAuthInfo: Future<Output = Result<Option<AuthInfo>, GetAuthInfoError>>,
WakeCompute: Future<Output = Result<compute::ConnCfg, WakeComputeError>>,
{
let creds = endpoint.as_ref();
info!("fetching user's authentication info");
let auth_info = get_auth_info(endpoint).await?;
let info = get_auth_info(endpoint).await?.unwrap_or_else(|| {
// If we don't have an authentication secret, we mock one to
// prevent malicious probing (possible due to missing protocol steps).
// This mocked secret will never lead to successful authentication.
info!("authentication info not found, mocking it");
AuthInfo::Scram(scram::ServerSecret::mock(creds.user, rand::random()))
});
let flow = AuthFlow::new(client);
let scram_keys = match auth_info {
let scram_keys = match info {
AuthInfo::Md5(_) => {
// TODO: decide if we should support MD5 in api v2
info!("auth endpoint chooses MD5");
return Err(auth::AuthError::bad_auth_method("MD5"));
}
AuthInfo::Scram(secret) => {
info!("auth endpoint chooses SCRAM");
let scram = auth::Scram(&secret);
let client_key = match flow.begin(scram).await?.authenticate().await? {
sasl::Outcome::Success(key) => key,
sasl::Outcome::Failure(reason) => {
info!("auth backend failed with an error: {reason}");
return Err(auth::AuthError::auth_failed(creds.user));
}
};
Some(compute::ScramKeys {
client_key: flow.begin(scram).await?.authenticate().await?.as_bytes(),
client_key: client_key.as_bytes(),
server_key: secret.server_key.as_bytes(),
})
}
@@ -249,6 +336,31 @@ where
})
}
/// Parse http response body, taking status code into account.
async fn parse_body<T: for<'a> Deserialize<'a>>(
response: reqwest::Response,
) -> Result<T, ApiError> {
let status = response.status();
if status.is_success() {
// We shouldn't log raw body because it may contain secrets.
info!("request succeeded, processing the body");
return Ok(response.json().await?);
}
// Don't throw an error here because it's not as important
// as the fact that the request itself has failed.
let body = response.json().await.unwrap_or_else(|e| {
warn!("failed to parse error body: {e}");
ConsoleError {
error: "reason unclear (malformed error message)".into(),
}
});
let text = body.error;
error!("console responded with an error ({status}): {text}");
Err(ApiError::Console { status, text })
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.split_once(':')?;
Some((host, port.parse().ok()?))

View File

@@ -1,6 +1,6 @@
use super::{AuthSuccess, NodeInfo};
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
use pq_proto::{BeMessage as Be, BeParameterStatusMessage};
use pq_proto::{BeMessage as Be, ParameterStatusMessage};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span};
@@ -60,7 +60,7 @@ pub async fn handle_user(
info!(parent: &span, "sending the auth URL to the user");
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message_noflush(&ParameterStatusMessage::encoding())?
.write_message(&Be::NoticeResponse(&greeting))
.await?;

View File

@@ -1,7 +1,7 @@
//! Local mock of Cloud API V2.
use super::{
console::{self, AuthInfo, GetAuthInfoError, TransportError, WakeComputeError},
console::{self, AuthInfo, GetAuthInfoError, WakeComputeError},
AuthSuccess,
};
use crate::{
@@ -12,7 +12,28 @@ use crate::{
stream::PqStream,
url::ApiUrl,
};
use futures::TryFutureExt;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span, warn, Instrument};
#[derive(Debug, Error)]
enum MockApiError {
#[error("Failed to read password: {0}")]
PasswordNotSet(tokio_postgres::Error),
}
impl From<MockApiError> for console::ApiError {
fn from(e: MockApiError) -> Self {
io_error(e).into()
}
}
impl From<tokio_postgres::Error> for console::ApiError {
fn from(e: tokio_postgres::Error) -> Self {
io_error(e).into()
}
}
#[must_use]
pub(super) struct Api<'a> {
@@ -20,10 +41,9 @@ pub(super) struct Api<'a> {
creds: &'a ClientCredentials<'a>,
}
// Helps eliminate graceless `.map_err` calls without introducing another ctor.
impl From<tokio_postgres::Error> for TransportError {
fn from(e: tokio_postgres::Error) -> Self {
io_error(e).into()
impl<'a> AsRef<ClientCredentials<'a>> for Api<'a> {
fn as_ref(&self) -> &ClientCredentials<'a> {
self.creds
}
}
@@ -35,54 +55,55 @@ impl<'a> Api<'a> {
/// Authenticate the existing user or throw an error.
pub(super) async fn handle_user(
self,
&'a self,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
) -> auth::Result<AuthSuccess<compute::ConnCfg>> {
// We reuse user handling logic from a production module.
console::handle_user(client, &self, Self::get_auth_info, Self::wake_compute).await
console::handle_user(client, self, Self::get_auth_info, Self::wake_compute).await
}
}
impl Api<'_> {
/// This implementation fetches the auth info from a local postgres instance.
async fn get_auth_info(&self) -> Result<AuthInfo, GetAuthInfoError> {
// Perhaps we could persist this connection, but then we'd have to
// write more code for reopening it if it got closed, which doesn't
// seem worth it.
let (client, connection) =
tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?;
async fn get_auth_info(&self) -> Result<Option<AuthInfo>, GetAuthInfoError> {
async {
// Perhaps we could persist this connection, but then we'd have to
// write more code for reopening it if it got closed, which doesn't
// seem worth it.
let (client, connection) =
tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?;
tokio::spawn(connection);
let query = "select rolpassword from pg_catalog.pg_authid where rolname = $1";
let rows = client.query(query, &[&self.creds.user]).await?;
tokio::spawn(connection);
let query = "select rolpassword from pg_catalog.pg_authid where rolname = $1";
let rows = client.query(query, &[&self.creds.user]).await?;
match &rows[..] {
// We can't get a secret if there's no such user.
[] => Err(io_error(format!("unknown user '{}'", self.creds.user)).into()),
// We can get at most one row, because `rolname` is unique.
let row = match rows.get(0) {
Some(row) => row,
// This means that the user doesn't exist, so there can be no secret.
// However, this is still a *valid* outcome which is very similar
// to getting `404 Not found` from the Neon console.
None => {
warn!("user '{}' does not exist", self.creds.user);
return Ok(None);
}
};
// We shouldn't get more than one row anyway.
[row, ..] => {
let entry = row
.try_get("rolpassword")
.map_err(|e| io_error(format!("failed to read user's password: {e}")))?;
let entry = row
.try_get("rolpassword")
.map_err(MockApiError::PasswordNotSet)?;
scram::ServerSecret::parse(entry)
.map(AuthInfo::Scram)
.or_else(|| {
// It could be an md5 hash if it's not a SCRAM secret.
let text = entry.strip_prefix("md5")?;
Some(AuthInfo::Md5({
let mut bytes = [0u8; 16];
hex::decode_to_slice(text, &mut bytes).ok()?;
bytes
}))
})
// Putting the secret into this message is a security hazard!
.ok_or(GetAuthInfoError::BadSecret)
}
info!("got a secret: {entry}"); // safe since it's not a prod scenario
let secret = scram::ServerSecret::parse(entry).map(AuthInfo::Scram);
Ok(secret.or_else(|| parse_md5(entry).map(AuthInfo::Md5)))
}
.map_err(crate::error::log_error)
.instrument(info_span!("get_auth_info", mock = self.endpoint.as_str()))
.await
}
/// We don't need to wake anything locally, so we just return the connection info.
pub(super) async fn wake_compute(&self) -> Result<compute::ConnCfg, WakeComputeError> {
pub async fn wake_compute(&self) -> Result<compute::ConnCfg, WakeComputeError> {
let mut config = compute::ConnCfg::new();
config
.host(self.endpoint.host_str().unwrap_or("localhost"))
@@ -93,3 +114,12 @@ impl<'a> Api<'a> {
Ok(config)
}
}
fn parse_md5(input: &str) -> Option<[u8; 16]> {
let text = input.strip_prefix("md5")?;
let mut bytes = [0u8; 16];
hex::decode_to_slice(text, &mut bytes).ok()?;
Some(bytes)
}

View File

@@ -2,7 +2,7 @@
use super::{AuthErrorImpl, PasswordHackPayload};
use crate::{sasl, scram, stream::PqStream};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
use pq_proto::{BeMessage, BeMessage as Be, SaslMessage};
use std::io;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -22,7 +22,7 @@ pub struct Scram<'a>(pub &'a scram::ServerSecret);
impl AuthMethod for Scram<'_> {
#[inline(always)]
fn first_message(&self) -> BeMessage<'_> {
Be::AuthenticationSasl(BeAuthenticationSaslMessage::Methods(scram::METHODS))
Be::AuthenticationSasl(SaslMessage::Methods(scram::METHODS))
}
}
@@ -89,7 +89,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, PasswordHack> {
/// Stream wrapper for handling [SCRAM](crate::scram) auth.
impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
/// Perform user authentication. Raise an error in case authentication failed.
pub async fn authenticate(self) -> super::Result<scram::ScramKey> {
pub async fn authenticate(self) -> super::Result<sasl::Outcome<scram::ScramKey>> {
// Initial client message contains the chosen auth method's name.
let msg = self.stream.read_password_message().await?;
let sasl = sasl::FirstMessage::parse(&msg)
@@ -101,10 +101,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
}
let secret = self.state.0;
let key = sasl::SaslStream::new(self.stream, sasl.message)
let outcome = sasl::SaslStream::new(self.stream, sasl.message)
.authenticate(scram::Exchange::new(secret, rand::random, None))
.await?;
Ok(key)
Ok(outcome)
}
}

View File

@@ -1,4 +1,15 @@
use std::io;
use std::{error::Error as StdError, fmt, io};
/// Upcast (almost) any error into an opaque [`io::Error`].
pub fn io_error(e: impl Into<Box<dyn StdError + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
/// A small combinator for pluggable error logging.
pub fn log_error<E: fmt::Display>(e: E) -> E {
tracing::error!("{e}");
e
}
/// Marks errors that may be safely shown to a client.
/// This trait can be seen as a specialized version of [`ToString`].
@@ -6,7 +17,7 @@ use std::io;
/// NOTE: This trait should not be implemented for [`anyhow::Error`], since it
/// is way too convenient and tends to proliferate all across the codebase,
/// ultimately leading to accidental leaks of sensitive data.
pub trait UserFacingError: ToString {
pub trait UserFacingError: fmt::Display {
/// Format the error for client, stripping all sensitive info.
///
/// Although this might be a no-op for many types, it's highly
@@ -17,8 +28,3 @@ pub trait UserFacingError: ToString {
self.to_string()
}
}
/// Upcast (almost) any error into an opaque [`io::Error`].
pub fn io_error(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}

View File

@@ -37,16 +37,6 @@ impl Endpoint {
) -> Result<reqwest::Response, reqwest::Error> {
self.client.execute(request).await
}
/// Execute a [request](reqwest::Request) and raise an error if status != 200.
pub async fn checked_execute(
&self,
request: reqwest::Request,
) -> Result<reqwest::Response, reqwest::Error> {
self.execute(request)
.await
.and_then(|r| r.error_for_status())
}
}
#[cfg(test)]

View File

@@ -49,17 +49,6 @@ static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
/// A small combinator for pluggable error logging.
async fn log_error<R, F>(future: F) -> F::Output
where
F: std::future::Future<Output = anyhow::Result<R>>,
{
future.await.map_err(|err| {
error!("{err}");
err
})
}
pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
@@ -80,7 +69,7 @@ pub async fn task_main(
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
tokio::spawn(
log_error(async move {
async move {
info!("spawned a task for {peer_addr}");
socket
@@ -88,6 +77,10 @@ pub async fn task_main(
.context("failed to set socket option")?;
handle_client(config, &cancel_map, session_id, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
})
.instrument(info_span!("client", session = format_args!("{session_id}"))),
);
@@ -264,12 +257,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
if !auth_result.reported_auth_ok {
stream
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
.write_message_noflush(&ParameterStatusMessage::encoding())?;
}
stream
.write_message_noflush(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion(&db.version),
ParameterStatusMessage::ServerVersion(&db.version),
))?
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
.write_message(&BeMessage::ReadyForQuery)

View File

@@ -1,6 +1,6 @@
///! A group of high-level tests for connection establishing logic and auth.
use super::*;
use crate::{auth, scram};
use crate::{auth, sasl, scram};
use async_trait::async_trait;
use rstest::rstest;
use tokio_postgres::config::SslMode;
@@ -100,8 +100,7 @@ impl Scram {
}
fn mock(user: &str) -> Self {
let salt = rand::random::<[u8; 32]>();
Scram(scram::ServerSecret::mock(user, &salt))
Scram(scram::ServerSecret::mock(user, rand::random()))
}
}
@@ -111,13 +110,17 @@ impl TestAuth for Scram {
self,
stream: &mut PqStream<Stream<S>>,
) -> anyhow::Result<()> {
auth::AuthFlow::new(stream)
let outcome = auth::AuthFlow::new(stream)
.begin(auth::Scram(&self.0))
.await?
.authenticate()
.await?;
Ok(())
use sasl::Outcome::*;
match outcome {
Success(_) => Ok(()),
Failure(reason) => bail!("autentication failed with an error: {reason}"),
}
}
}
@@ -136,7 +139,7 @@ async fn dummy_proxy(
stream
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message_noflush(&ParameterStatusMessage::encoding())?
.write_message(&BeMessage::ReadyForQuery)
.await?;

View File

@@ -16,22 +16,19 @@ use thiserror::Error;
pub use channel_binding::ChannelBinding;
pub use messages::FirstMessage;
pub use stream::SaslStream;
pub use stream::{Outcome, SaslStream};
/// Fine-grained auth errors help in writing tests.
#[derive(Error, Debug)]
pub enum Error {
#[error("Failed to authenticate client: {0}")]
AuthenticationFailed(&'static str),
#[error("Channel binding failed: {0}")]
ChannelBindingFailed(&'static str),
#[error("Unsupported channel binding method: {0}")]
ChannelBindingBadMethod(Box<str>),
#[error("Bad client message")]
BadClientMessage,
#[error("Bad client message: {0}")]
BadClientMessage(&'static str),
#[error(transparent)]
Io(#[from] io::Error),
@@ -41,8 +38,6 @@ impl UserFacingError for Error {
fn to_string_client(&self) -> String {
use Error::*;
match self {
// This constructor contains the reason why auth has failed.
AuthenticationFailed(s) => s.to_string(),
// TODO: add support for channel binding
ChannelBindingFailed(_) => "channel binding is not supported yet".to_string(),
ChannelBindingBadMethod(m) => format!("unsupported channel binding method {m}"),
@@ -55,11 +50,14 @@ impl UserFacingError for Error {
pub type Result<T> = std::result::Result<T, Error>;
/// A result of one SASL exchange.
#[must_use]
pub enum Step<T, R> {
/// We should continue exchanging messages.
Continue(T),
Continue(T, String),
/// The client has been authenticated successfully.
Authenticated(R),
Success(R, String),
/// Authentication failed (reason attached).
Failure(&'static str),
}
/// Every SASL mechanism (e.g. [SCRAM](crate::scram)) is expected to implement this trait.
@@ -69,5 +67,5 @@ pub trait Mechanism: Sized {
/// Produce a server challenge to be sent to the client.
/// This is how this method is called in PostgreSQL (`libpq/sasl.h`).
fn exchange(self, input: &str) -> Result<(Step<Self, Self::Output>, String)>;
fn exchange(self, input: &str) -> Result<Step<Self, Self::Output>>;
}

View File

@@ -1,7 +1,7 @@
//! Definitions for SASL messages.
use crate::parse::{split_at_const, split_cstr};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage};
use pq_proto::{BeMessage, SaslMessage};
/// SASL-specific payload of [`PasswordMessage`](pq_proto::FeMessage::PasswordMessage).
#[derive(Debug)]
@@ -42,10 +42,9 @@ pub(super) enum ServerMessage<T> {
impl<'a> ServerMessage<&'a str> {
pub(super) fn to_reply(&self) -> BeMessage<'a> {
use BeAuthenticationSaslMessage::*;
BeMessage::AuthenticationSasl(match self {
ServerMessage::Continue(s) => Continue(s.as_bytes()),
ServerMessage::Final(s) => Final(s.as_bytes()),
Self::Continue(s) => SaslMessage::Continue(s.as_bytes()),
Self::Final(s) => SaslMessage::Final(s.as_bytes()),
})
}
}

View File

@@ -48,28 +48,41 @@ impl<S: AsyncWrite + Unpin> SaslStream<'_, S> {
}
}
/// SASL authentication outcome.
/// It's much easier to match on those two variants
/// than to peek into a noisy protocol error type.
#[must_use = "caller must explicitly check for success"]
pub enum Outcome<R> {
/// Authentication succeeded and produced some value.
Success(R),
/// Authentication failed (reason attached).
Failure(&'static str),
}
impl<S: AsyncRead + AsyncWrite + Unpin> SaslStream<'_, S> {
/// Perform SASL message exchange according to the underlying algorithm
/// until user is either authenticated or denied access.
pub async fn authenticate<M: Mechanism>(
mut self,
mut mechanism: M,
) -> super::Result<M::Output> {
) -> super::Result<Outcome<M::Output>> {
loop {
let input = self.recv().await?;
let (moved, reply) = mechanism.exchange(input)?;
let step = mechanism.exchange(input)?;
use super::Step::*;
match moved {
Continue(moved) => {
use super::Step;
return Ok(match step {
Step::Continue(moved_mechanism, reply) => {
self.send(&ServerMessage::Continue(&reply)).await?;
mechanism = moved;
mechanism = moved_mechanism;
continue;
}
Authenticated(result) => {
Step::Success(result, reply) => {
self.send(&ServerMessage::Final(&reply)).await?;
return Ok(result);
Outcome::Success(result)
}
}
Step::Failure(reason) => Outcome::Failure(reason),
});
}
}
}

View File

@@ -64,12 +64,12 @@ impl<'a> Exchange<'a> {
impl sasl::Mechanism for Exchange<'_> {
type Output = super::ScramKey;
fn exchange(mut self, input: &str) -> sasl::Result<(sasl::Step<Self, Self::Output>, String)> {
fn exchange(mut self, input: &str) -> sasl::Result<sasl::Step<Self, Self::Output>> {
use {sasl::Step::*, ExchangeState::*};
match &self.state {
Initial => {
let client_first_message =
ClientFirstMessage::parse(input).ok_or(SaslError::BadClientMessage)?;
let client_first_message = ClientFirstMessage::parse(input)
.ok_or(SaslError::BadClientMessage("invalid client-first-message"))?;
let server_first_message = client_first_message.build_server_first_message(
&(self.nonce)(),
@@ -84,15 +84,15 @@ impl sasl::Mechanism for Exchange<'_> {
server_first_message,
};
Ok((Continue(self), msg))
Ok(Continue(self, msg))
}
SaltSent {
cbind_flag,
client_first_message_bare,
server_first_message,
} => {
let client_final_message =
ClientFinalMessage::parse(input).ok_or(SaslError::BadClientMessage)?;
let client_final_message = ClientFinalMessage::parse(input)
.ok_or(SaslError::BadClientMessage("invalid client-final-message"))?;
let channel_binding = cbind_flag.encode(|_| {
self.cert_digest
@@ -106,9 +106,7 @@ impl sasl::Mechanism for Exchange<'_> {
}
if client_final_message.nonce != server_first_message.nonce() {
return Err(SaslError::AuthenticationFailed(
"combined nonce doesn't match",
));
return Err(SaslError::BadClientMessage("combined nonce doesn't match"));
}
let signature_builder = SignatureBuilder {
@@ -121,14 +119,15 @@ impl sasl::Mechanism for Exchange<'_> {
.build(&self.secret.stored_key)
.derive_client_key(&client_final_message.proof);
if client_key.sha256() != self.secret.stored_key {
return Err(SaslError::AuthenticationFailed("password doesn't match"));
// Auth fails either if keys don't match or it's pre-determined to fail.
if client_key.sha256() != self.secret.stored_key || self.secret.doomed {
return Ok(Failure("password doesn't match"));
}
let msg = client_final_message
.build_server_final_message(signature_builder, &self.secret.server_key);
Ok((Authenticated(client_key), msg))
Ok(Success(client_key, msg))
}
}
}

View File

@@ -14,6 +14,9 @@ pub struct ServerSecret {
pub stored_key: ScramKey,
/// Used by client to verify server's signature.
pub server_key: ScramKey,
/// Should auth fail no matter what?
/// This is exactly the case for mocked secrets.
pub doomed: bool,
}
impl ServerSecret {
@@ -30,6 +33,7 @@ impl ServerSecret {
salt_base64: salt.to_owned(),
stored_key: base64_decode_array(stored_key)?.into(),
server_key: base64_decode_array(server_key)?.into(),
doomed: false,
};
Some(secret)
@@ -38,16 +42,16 @@ impl ServerSecret {
/// To avoid revealing information to an attacker, we use a
/// mocked server secret even if the user doesn't exist.
/// See `auth-scram.c : mock_scram_secret` for details.
#[allow(dead_code)]
pub fn mock(user: &str, nonce: &[u8; 32]) -> Self {
pub fn mock(user: &str, nonce: [u8; 32]) -> Self {
// Refer to `auth-scram.c : scram_mock_salt`.
let mocked_salt = super::sha256([user.as_bytes(), nonce]);
let mocked_salt = super::sha256([user.as_bytes(), &nonce]);
Self {
iterations: 4096,
salt_base64: base64::encode(&mocked_salt),
stored_key: ScramKey::default(),
server_key: ScramKey::default(),
doomed: true,
}
}
@@ -67,6 +71,7 @@ impl ServerSecret {
salt_base64: base64::encode(&salt),
stored_key: password.client_key().sha256(),
server_key: password.server_key(),
doomed: false,
})
}
}

View File

@@ -109,8 +109,9 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
/// Write the error message using [`Self::write_message`], then re-throw it.
/// Allowing string literals is safe under the assumption they might not contain any runtime info.
/// This method exists due to `&str` not implementing `Into<anyhow::Error>`.
pub async fn throw_error_str<T>(&mut self, error: &'static str) -> anyhow::Result<T> {
// This method exists due to `&str` not implementing `Into<anyhow::Error>`
tracing::info!("forwarding error to user: {error}");
self.write_message(&BeMessage::ErrorResponse(error)).await?;
bail!(error)
}
@@ -122,6 +123,7 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
E: UserFacingError + Into<anyhow::Error>,
{
let msg = error.to_string_client();
tracing::info!("forwarding error to user: {msg}");
self.write_message(&BeMessage::ErrorResponse(&msg)).await?;
bail!(error)
}

View File

@@ -318,12 +318,15 @@ fn set_id(workdir: &Path, given_id: Option<NodeId>) -> Result<NodeId> {
}
// Parse RemoteStorage from TOML table.
fn parse_remote_storage(storage_conf: &str) -> Result<RemoteStorageConfig> {
fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfig> {
// funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
let storage_conf_toml = format!("remote_storage = {}", storage_conf);
let storage_conf_toml = format!("remote_storage = {storage_conf}");
let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
RemoteStorageConfig::from_toml(storage_conf_parsed_toml)
RemoteStorageConfig::from_toml(storage_conf_parsed_toml).and_then(|parsed_config| {
// XXX: Don't print the original toml here, there might be some sensitive data
parsed_config.context("Incorrectly parsed remote storage toml as no remote storage config")
})
}
#[test]

View File

@@ -76,9 +76,15 @@ Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION="14"`
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
`NEON_PAGESERVER_OVERRIDES`: add a `;`-separated set of configs that will be passed as
`--pageserver-config-override=${value}` parameter values when neon_local cli is invoked
`RUST_LOG`: logging configuration to pass into Neon CLI
Useful parameters and commands:
`--pageserver-config-override=${value}` `-c` values to pass into pageserver through neon_local cli
`--preserve-database-files` to preserve pageserver (layer) and safekeer (segment) timeline files on disk
after running a test suite. Such files might be large, so removed by default; but might be useful for debugging or creation of svg images with layer file contents.
Let stdout, stderr and `INFO` log messages go to the terminal instead of capturing them:
`./scripts/pytest -s --log-cli-level=INFO ...`
(Note many tests capture subprocess outputs separately, so this may not

View File

@@ -39,6 +39,14 @@ def parse_metrics(text: str, name: str = "") -> Metrics:
return metrics
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
"pageserver_remote_upload_queue_unfinished_tasks",
"pageserver_remote_operation_seconds_bucket",
"pageserver_remote_operation_seconds_count",
"pageserver_remote_operation_seconds_sum",
"pageserver_remote_physical_size",
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_current_logical_size",
"pageserver_current_physical_size",
@@ -62,4 +70,5 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_wait_lsn_seconds_sum",
"pageserver_created_persistent_files_total",
"pageserver_written_persistent_bytes_total",
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
)

View File

@@ -30,10 +30,17 @@ import psycopg2
import pytest
import requests
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from fixtures.log_helper import log
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import Fn, allure_attach_from_dir, get_self_dir, subprocess_capture
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
Fn,
allure_attach_from_dir,
get_self_dir,
subprocess_capture,
)
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
@@ -590,6 +597,7 @@ class NeonEnvBuilder:
auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name: str = DEFAULT_BRANCH_NAME,
preserve_database_files: bool = False,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -611,6 +619,7 @@ class NeonEnvBuilder:
self.neon_binpath = neon_binpath
self.pg_distrib_dir = pg_distrib_dir
self.pg_version = pg_version
self.preserve_database_files = preserve_database_files
def init(self) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -718,6 +727,28 @@ class NeonEnvBuilder:
prefix_in_bucket=self.remote_storage_prefix,
)
def cleanup_local_storage(self):
if self.preserve_database_files:
return
directories_to_clean: List[Path] = []
for test_entry in Path(self.repo_dir).glob("**/*"):
if test_entry.is_file():
test_file = test_entry
if ATTACHMENT_NAME_REGEX.fullmatch(test_file.name):
continue
if SMALL_DB_FILE_NAME_REGEX.fullmatch(test_file.name):
continue
log.debug(f"Removing large database {test_file} file")
test_file.unlink()
elif test_entry.is_dir():
directories_to_clean.append(test_entry)
for directory_to_clean in reversed(directories_to_clean):
if not os.listdir(directory_to_clean):
log.debug(f"Removing empty directory {directory_to_clean}")
directory_to_clean.rmdir()
def cleanup_remote_storage(self):
# here wee check for true remote storage, no the local one
# local cleanup is not needed after test because in ci all env will be destroyed anyway
@@ -783,7 +814,22 @@ class NeonEnvBuilder:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
self.cleanup_remote_storage()
cleanup_error = None
try:
self.cleanup_remote_storage()
except Exception as e:
log.error(f"Error during remote storage cleanup: {e}")
cleanup_error = e
try:
self.cleanup_local_storage()
except Exception as e:
log.error(f"Error during local storage cleanup: {e}")
if cleanup_error is not None:
cleanup_error = e
if cleanup_error is not None:
raise cleanup_error
self.env.pageserver.assert_no_errors()
@@ -949,6 +995,7 @@ class NeonEnv:
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(
request: FixtureRequest,
pytestconfig: Config,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: NeonBroker,
@@ -980,6 +1027,7 @@ def _shared_simple_env(
pg_distrib_dir=pg_distrib_dir,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
) as builder:
env = builder.init_start()
@@ -1006,6 +1054,7 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
@pytest.fixture(scope="function")
def neon_env_builder(
pytestconfig: Config,
test_output_dir: str,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
@@ -1041,6 +1090,7 @@ def neon_env_builder(
pg_version=pg_version,
broker=default_broker,
run_id=run_id,
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
) as builder:
yield builder
@@ -2092,62 +2142,73 @@ class PSQL:
class NeonProxy(PgProtocol):
link_auth_uri: str = "http://dummy-uri"
class AuthBackend(abc.ABC):
"""All auth backends must inherit from this class"""
@property
def default_conn_url(self) -> Optional[str]:
return None
@abc.abstractmethod
def extra_args(self) -> list[str]:
pass
class Link(AuthBackend):
def extra_args(self) -> list[str]:
return [
# Link auth backend params
*["--auth-backend", "link"],
*["--uri", NeonProxy.link_auth_uri],
]
@dataclass(frozen=True)
class Postgres(AuthBackend):
pg_conn_url: str
@property
def default_conn_url(self) -> Optional[str]:
return self.pg_conn_url
def extra_args(self) -> list[str]:
return [
# Postgres auth backend params
*["--auth-backend", "postgres"],
*["--auth-endpoint", self.pg_conn_url],
]
def __init__(
self,
neon_binpath: Path,
proxy_port: int,
http_port: int,
mgmt_port: int,
neon_binpath: Path,
auth_endpoint=None,
auth_backend: NeonProxy.AuthBackend,
):
super().__init__(dsn=auth_endpoint, port=proxy_port)
self.host = "127.0.0.1"
host = "127.0.0.1"
super().__init__(dsn=auth_backend.default_conn_url, host=host, port=proxy_port)
self.host = host
self.http_port = http_port
self.neon_binpath = neon_binpath
self.proxy_port = proxy_port
self.mgmt_port = mgmt_port
self.auth_endpoint = auth_endpoint
self.auth_backend = auth_backend
self._popen: Optional[subprocess.Popen[bytes]] = None
self.link_auth_uri_prefix = "http://dummy-uri"
def start(self):
"""
Starts a proxy with option '--auth-backend postgres' and a postgres instance
already provided though '--auth-endpoint <postgress-instance>'."
"""
def start(self) -> NeonProxy:
assert self._popen is None
assert self.auth_endpoint is not None
# Start proxy
args = [
str(self.neon_binpath / "proxy"),
*["--http", f"{self.host}:{self.http_port}"],
*["--proxy", f"{self.host}:{self.proxy_port}"],
*["--mgmt", f"{self.host}:{self.mgmt_port}"],
*["--auth-backend", "postgres"],
*["--auth-endpoint", self.auth_endpoint],
*self.auth_backend.extra_args(),
]
self._popen = subprocess.Popen(args)
self._wait_until_ready()
def start_with_link_auth(self):
"""
Starts a proxy with option '--auth-backend link' and a dummy authentication link '--uri dummy-auth-link'."
"""
assert self._popen is None
# Start proxy
bin_proxy = str(self.neon_binpath / "proxy")
args = [bin_proxy]
args.extend(["--http", f"{self.host}:{self.http_port}"])
args.extend(["--proxy", f"{self.host}:{self.proxy_port}"])
args.extend(["--mgmt", f"{self.host}:{self.mgmt_port}"])
args.extend(["--auth-backend", "link"])
args.extend(["--uri", self.link_auth_uri_prefix])
arg_str = " ".join(args)
log.info(f"starting proxy with command line ::: {arg_str}")
self._popen = subprocess.Popen(args, stdout=subprocess.PIPE)
self._wait_until_ready()
return self
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
def _wait_until_ready(self):
@@ -2158,7 +2219,7 @@ class NeonProxy(PgProtocol):
request_result.raise_for_status()
return request_result.text
def __enter__(self) -> "NeonProxy":
def __enter__(self) -> NeonProxy:
return self
def __exit__(
@@ -2176,11 +2237,19 @@ class NeonProxy(PgProtocol):
@pytest.fixture(scope="function")
def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
with NeonProxy(proxy_port, http_port, neon_binpath=neon_binpath, mgmt_port=mgmt_port) as proxy:
proxy.start_with_link_auth()
with NeonProxy(
neon_binpath=neon_binpath,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
auth_backend=NeonProxy.Link(),
) as proxy:
proxy.start()
yield proxy
@@ -2204,11 +2273,11 @@ def static_proxy(
http_port = port_distributor.get_port()
with NeonProxy(
neon_binpath=neon_binpath,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
neon_binpath=neon_binpath,
auth_endpoint=auth_endpoint,
auth_backend=NeonProxy.Postgres(auth_endpoint),
) as proxy:
proxy.start()
yield proxy
@@ -2716,6 +2785,20 @@ def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return test_dir
def pytest_addoption(parser: Parser):
parser.addoption(
"--preserve-database-files",
action="store_true",
default=False,
help="Preserve timeline files after the test suite is over",
)
SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
r"config|metadata|.+\.(?:toml|pid|json|sql)"
)
# This is autouse, so the test output directory always gets created, even
# if a test doesn't put anything there. It also solves a problem with the
# neon_simple_env fixture: if TEST_SHARED_FIXTURES is not set, it
@@ -3001,3 +3084,34 @@ def fork_at_current_lsn(
"""
current_lsn = pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
def wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: List[Safekeeper],
pageserver: NeonPageserver,
):
sk_commit_lsns = [
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn for sk in safekeepers
]
lsn = max(sk_commit_lsns)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, lsn)
return lsn
def wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: List[Safekeeper],
pageserver: NeonPageserver,
):
lsn = wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
tenant_id, timeline_id, safekeepers, pageserver
)
ps_http = pageserver.http_client()
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
return lsn

View File

@@ -16,6 +16,7 @@ Some handy pytest flags for local development:
- `-s` shows test output
- `-k` selects a test to run
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
- `--cleanup-test-ouput` cleans up after each test
# What performance tests do we have and how we run them

View File

@@ -22,15 +22,16 @@ from pytest_lazyfixture import lazy_fixture # type: ignore
],
)
@pytest.mark.parametrize(
"env, scale",
"env,scale",
[
# Run on all envs. Use 200x larger table on remote cluster to make sure
# it doesn't fit in shared buffers, which are larger on remote than local.
pytest.param(lazy_fixture("neon_compare"), 1, id="neon"),
pytest.param(lazy_fixture("vanilla_compare"), 1, id="vanilla"),
pytest.param(
lazy_fixture("remote_compare"), 200, id="remote", marks=pytest.mark.remote_cluster
),
# Reenable after switching per-test projects created via API
# pytest.param(
# lazy_fixture("remote_compare"), 200, id="remote", marks=pytest.mark.remote_cluster
# ),
],
)
def test_seqscans(env: PgCompare, scale: int, rows: int, iters: int, workers: int):
@@ -45,7 +46,7 @@ def test_seqscans(env: PgCompare, scale: int, rows: int, iters: int, workers: in
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('t') as tbl_ize
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('t') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)

View File

@@ -47,6 +47,7 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
neon_env_builder.pg_version = "14"
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_local_fs_remote_storage()
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start()
pg = env.postgres.create_start("main")
@@ -331,6 +332,7 @@ def check_neon_works(
config.initial_tenant = snapshot_config["default_tenant_id"]
config.neon_binpath = neon_binpath
config.pg_distrib_dir = pg_distrib_dir
config.preserve_database_files = True
cli = NeonCli(config)
cli.raw_cli(["start"])

View File

@@ -45,7 +45,7 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
# will cause GetPage requests.
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_ize
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)

View File

@@ -32,7 +32,7 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_ize
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
@@ -115,7 +115,7 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_ize
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)

View File

@@ -28,61 +28,58 @@ def test_password_hack(static_proxy: NeonProxy):
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
def get_session_id(uri_prefix, uri_line):
assert uri_prefix in uri_line
url_parts = urlparse(uri_line)
psql_session_id = url_parts.path[1:]
assert psql_session_id.isalnum(), "session_id should only contain alphanumeric chars"
return psql_session_id
async def find_auth_link(link_auth_uri_prefix, proc):
for _ in range(100):
line = (await proc.stderr.readline()).decode("utf-8").strip()
log.info(f"psql line: {line}")
if link_auth_uri_prefix in line:
log.info(f"SUCCESS, found auth url: {line}")
return line
async def activate_link_auth(local_vanilla_pg, link_proxy, psql_session_id):
pg_user = "proxy"
log.info("creating a new user for link auth test")
local_vanilla_pg.start()
local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser")
db_info = json.dumps(
{
"session_id": psql_session_id,
"result": {
"Success": {
"host": local_vanilla_pg.default_options["host"],
"port": local_vanilla_pg.default_options["port"],
"dbname": local_vanilla_pg.default_options["dbname"],
"user": pg_user,
"project": "irrelevant",
}
},
}
)
log.info("sending session activation message")
psql = await PSQL(host=link_proxy.host, port=link_proxy.mgmt_port).run(db_info)
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "ok"
@pytest.mark.asyncio
async def test_psql_session_id(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy):
def get_session_id(uri_prefix, uri_line):
assert uri_prefix in uri_line
url_parts = urlparse(uri_line)
psql_session_id = url_parts.path[1:]
assert psql_session_id.isalnum(), "session_id should only contain alphanumeric chars"
return psql_session_id
async def find_auth_link(link_auth_uri, proc):
for _ in range(100):
line = (await proc.stderr.readline()).decode("utf-8").strip()
log.info(f"psql line: {line}")
if link_auth_uri in line:
log.info(f"SUCCESS, found auth url: {line}")
return line
async def activate_link_auth(local_vanilla_pg, link_proxy, psql_session_id):
pg_user = "proxy"
log.info("creating a new user for link auth test")
local_vanilla_pg.start()
local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser")
db_info = json.dumps(
{
"session_id": psql_session_id,
"result": {
"Success": {
"host": local_vanilla_pg.default_options["host"],
"port": local_vanilla_pg.default_options["port"],
"dbname": local_vanilla_pg.default_options["dbname"],
"user": pg_user,
"project": "irrelevant",
}
},
}
)
log.info("sending session activation message")
psql = await PSQL(host=link_proxy.host, port=link_proxy.mgmt_port).run(db_info)
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "ok"
psql = await PSQL(host=link_proxy.host, port=link_proxy.proxy_port).run("select 42")
uri_prefix = link_proxy.link_auth_uri_prefix
link = await find_auth_link(uri_prefix, psql)
base_uri = link_proxy.link_auth_uri
link = await find_auth_link(base_uri, psql)
psql_session_id = get_session_id(uri_prefix, link)
psql_session_id = get_session_id(base_uri, link)
await activate_link_auth(vanilla_pg, link_proxy, psql_session_id)
assert psql.stdout is not None
@@ -97,3 +94,31 @@ def test_proxy_options(static_proxy: NeonProxy):
cur.execute("SHOW proxytest.option")
value = cur.fetchall()[0][0]
assert value == "value"
def test_auth_errors(static_proxy: NeonProxy):
# User does not exist
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
static_proxy.safe_psql(
"create role pinocchio with login password 'magic'", options="project=irrelevant"
)
# User exists, but password is missing
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password=None, options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
# User exists, but password is wrong
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password="bad", options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
# Finally, check that the user can connect
with static_proxy.connect(user="pinocchio", password="magic", options="project=irrelevant"):
pass

View File

@@ -384,7 +384,8 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
metrics,
re.MULTILINE,
)
assert matches
if matches is None:
return None
return int(matches[1])
pg = env.postgres.create_start("main", tenant_id=tenant_id)
@@ -436,8 +437,8 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
assert not timeline_path.exists()
# timeline deletion should kill ongoing uploads
assert get_queued_count(file_kind="index", op_kind="upload") == 0
# timeline deletion should kill ongoing uploads, so, the metric will be gone
assert get_queued_count(file_kind="index", op_kind="upload") is None
# timeline deletion should be unblocking checkpoint ops
checkpoint_thread.join(2.0)

View File

@@ -133,3 +133,28 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
"pitr_interval": 2592000,
}.items()
)
# update the config with very short config and make sure no trailing chars are left from previous config
env.neon_cli.config_tenant(
tenant_id=tenant,
conf={
"pitr_interval": "1 min",
},
)
# restart the pageserver and ensure that the config is still correct
env.pageserver.stop()
env.pageserver.start()
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as pscur:
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert all(
i in res.items()
for i in {
"compaction_period": 20,
"pitr_interval": 60,
}.items()
)

View File

@@ -7,7 +7,11 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
from fixtures.metrics import (
PAGESERVER_PER_TENANT_METRICS,
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
parse_metrics,
)
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
@@ -157,9 +161,21 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
)
def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize(
"remote_storage_kind",
# exercise both the code paths where remote_storage=None and remote_storage=Some(...)
[RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3],
)
def test_pageserver_metrics_removed_after_detach(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
"""Tests that when a tenant is detached, the tenant specific metrics are not left behind"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_metrics_removed_after_detach",
)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
@@ -192,7 +208,11 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
for tenant in [tenant_1, tenant_2]:
pre_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
assert pre_detach_samples == set(PAGESERVER_PER_TENANT_METRICS)
expected = set(PAGESERVER_PER_TENANT_METRICS)
if remote_storage_kind == RemoteStorageKind.NOOP:
# if there's no remote storage configured, we don't expose the remote timeline client metrics
expected -= set(PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS)
assert pre_detach_samples == expected
env.pageserver.http_client().tenant_detach(tenant)

View File

@@ -24,6 +24,7 @@ from fixtures.neon_fixtures import (
assert_no_in_progress_downloads_for_tenant,
available_remote_storages,
wait_for_last_record_lsn,
wait_for_sk_commit_lsn_to_reach_remote_storage,
wait_for_upload,
)
from fixtures.types import Lsn, TenantId, TimelineId
@@ -161,16 +162,9 @@ def test_tenants_attached_after_download(
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
sk_commit_lsns = [
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn
for sk in env.safekeepers
]
log.info("wait for pageserver to process all the WAL")
wait_for_last_record_lsn(client, tenant_id, timeline_id, max(sk_commit_lsns))
log.info("wait for it to reach remote storage")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, max(sk_commit_lsns))
log.info("latest safekeeper_commit_lsn reached remote storage")
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
detail_before = client.timeline_detail(
tenant_id, timeline_id, include_non_incremental_physical_size=True

View File

@@ -65,7 +65,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_ize
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)