Compare commits

..

20 Commits

Author SHA1 Message Date
Konstantin Knizhnik
c5eb5bb048 Save prepared statements to be able to use them with connetion pooling 2023-04-27 10:01:20 +03:00
Arseny Sher
31a3910fd9 Remove wait_for_sk_commit_lsn_to_reach_remote_storage.
It had a couple of inherent races:

1) Even if compute is killed before the call, some more data might still arrive
to safekeepers after commit_lsn on them is polled, advancing it. Then checkpoint
on pageserver might not include this tail, and so upload of expected LSN won't
happen until one more checkpoint.

2) commit_lsn is updated asynchronously -- compute can commit transaction before
communicating commit_lsn to even single safekeeper (sync-safekeepers can be used
to forces the advancement). This makes semantics of
wait_for_sk_commit_lsn_to_reach_remote_storage quite complicated.

Replace it with last_flush_lsn_upload which
1) Learns last flush LSN on compute;
2) Waits for it to arrive to pageserver;
3) Checkpoints it;
4) Waits for the upload.

In some tests this keeps compute alive longer than before, but this doesn't seem
to be important.

There is a chance this fixes https://github.com/neondatabase/neon/issues/3209
2023-04-26 13:46:33 +04:00
Joonas Koivunen
381c8fca4f feat: log how long tenant activation takes (#4080)
Adds just a counter counting up from the creation to the tenant, logged
after activation. Might help guide us with the investigation of #4025.
2023-04-26 12:39:17 +03:00
Joonas Koivunen
4625da3164 build: remove busted sk-1.us-east-2 from staging hosts (#4082)
this should give us complete deployments while a new one is being
brought up.
2023-04-26 09:07:45 +00:00
Joonas Koivunen
850f6b1cb9 refactor: drop pageserver_ondisk_layers (#4071)
I didn't get through #3775 fast enough so we wanted to remove this
metric.

Fixes #3705.
2023-04-26 11:49:29 +03:00
Sergey Melnikov
f19b70b379 Configure extra domain for us-east-1 (#4078) 2023-04-26 09:36:26 +02:00
Sergey Melnikov
9d0cf08d5f Fix new storage-broker deploy for eu-central-1 (#4079) 2023-04-26 10:29:44 +03:00
Alexander Bayandin
2d6fd72177 GitHub Workflows: Fix crane for several registries (#4076)
Follow-up fix after https://github.com/neondatabase/neon/pull/4067

```
+ crane tag neondatabase/vm-compute-node-v14:3064 latest
Error: fetching "neondatabase/vm-compute-node-v14:3064": GET https://index.docker.io/v2/neondatabase/vm-compute-node-v14/manifests/3064: MANIFEST_UNKNOWN: manifest unknown; unknown tag=3064
```

I reverted back the previous approach for promoting images
(login to one registry, save images to local fs, logout and login to
another registry, and push images from local fs). It turns out what
works for one Google project (kaniko), doesn't work for another (crane)
[sigh]
2023-04-25 23:58:59 +01:00
Heikki Linnakangas
8945fbdb31 Enable OpenTelemetry tracing in proxy in staging. (#4065)
Depends on https://github.com/neondatabase/helm-charts/pull/32

Co-authored-by: Lassi Pölönen <lassi.polonen@iki.fi>
2023-04-25 20:45:36 +03:00
Alexander Bayandin
05ac0e2493 Login to ECR and Docker Hub at once (#4067)
- Update kaniko to 1.9.2 (from 1.7.0), problem with reproducible build is fixed
- Login to ECR and Docker Hub at once, so we can push to several
registries, it makes job `push-docker-hub` unneeded
- `push-docker-hub` replaced with `promote-images` in `needs:` clause,
Pushing images to production ECR moved to `promote-images` job
2023-04-25 17:54:10 +01:00
Joonas Koivunen
bfd45dd671 test_tenant_config: allow ERROR from eviction task (#4074) 2023-04-25 18:41:09 +03:00
Joonas Koivunen
7f80230fd2 fix: stop dead_code rustc lint (#4070)
only happens without `--all-features` which is what `./run_clippy.sh`
uses.
2023-04-25 17:07:04 +02:00
Sergey Melnikov
78bbbccadb Deploy proxies for preview enviroments (#4052)
## Describe your changes
Deploy `main` proxies to the preview environments
We don't deploy storage there yet, as it's tricky.

## Issue ticket number and link
https://github.com/neondatabase/cloud/issues/4737
2023-04-25 16:46:52 +02:00
Christian Schwarz
dbbe032c39 neon_local: fix tenant create -c eviction_policy:... (#4004)
And add corresponding unit test.

The fix is to use `.remove()` instead of `.get()` when processing the
arugments hash map.
The code uses emptiness of the hash map to determine whether all
arguments have been processed.
This was likely a copy-paste error.

    
refs https://github.com/neondatabase/neon/issues/3942
2023-04-25 15:33:30 +02:00
Joonas Koivunen
cb9473928d feat: add rough timings for basebackup (#4062)
just record the time needed for waiting the lsn and then the basebackup
in a log message in millis. this is related to ongoing investigations to
cold start performance.

this could also be a a counter. it cannot be added next to smgr
histograms, because we don't want another histogram per timeline.

the aim is to allow drilling deeper into which timelines were slow, and
to understand why some need two basebackups.
2023-04-25 13:22:16 +00:00
Christian Schwarz
fa20e37574 add gauge for in-flight layer uploads (#3951)
For the "worst-case /storage usage panel", we need to compute
```
remote size + local-only size
```

We currently don't have a metric for local-only layers.

The number of in-flight layers in the upload queue is just that, so, let
Prometheus scrape it.

The metric is two counters (started and finished).
The delta is the amount of in-flight uploads in the queue.

The metrics are incremented in the respective `call_unfinished_metric_*`
functions.
These track ongoing operations by file_kind and op_kind.
We only need this metric for layer uploads, so, there's the new
RemoteTimelineClientMetricsCallTrackSize type that forces all call sites
to decide whether they want the size tracked or not.
If we find that other file_kinds or op_kinds are interesting (metadata
uploads, layer downloads, layer deletes) are interesting, we can just
enable them, and they'll be just another label combination within the
metrics that this PR adds.

fixes https://github.com/neondatabase/neon/issues/3922
2023-04-25 14:22:48 +02:00
Joonas Koivunen
4911d7ce6f feat: warn when requests get cancelled (#4064)
Add a simple disarmable dropguard to log if request is cancelled before
it is completed. We currently don't have this, and it makes for
difficult to know when the request was dropped.
2023-04-25 15:22:23 +03:00
Christian Schwarz
e83684b868 add libmetric metric for each logged log message (#4055)
This patch extends the libmetrics logging setup functionality with a
`tracing` layer that increments a Prometheus counter each time we log a
log message. We have the counter per tracing event level. This allows
for monitoring WARN and ERR log volume without parsing the log. Also, it
would allow cross-checking whether logs got dropped on the way into
Loki.

It would be nicer if we could hook deeper into the tracing logging
layer, to avoid evaluating the filter twice.
But I don't know how to do it.
2023-04-25 14:10:18 +02:00
Eduard Dyckman
afbbc61036 Adding synthetic size to pageserver swagger (#4049)
## Describe your changes

I added synthetic size response to the console swagger. Now I am syncing
it back to neon
2023-04-24 16:19:25 +03:00
Alexey Kondratov
7ba5c286b7 [compute_ctl] Improve 'empty' compute startup sequence (#4034)
Do several attempts to get spec from the control-plane and retry network
errors and all reasonable HTTP response codes. Do not hang waiting for
spec without confirmation from the control-plane that compute is known
and is in the `Empty` state.

Adjust the way we track `total_startup_ms` metric, it should be
calculated since the moment we received spec, not from the moment
`compute_ctl` started. Also introduce a new `wait_for_spec_ms` metric
to track the time spent sleeping and waiting for spec to be delivered
from control-plane.

Part of neondatabase/cloud#3533
2023-04-21 11:10:48 +02:00
50 changed files with 1520 additions and 570 deletions

View File

@@ -0,0 +1,47 @@
storage:
vars:
bucket_name: neon-dev-storage-eu-central-1
bucket_region: eu-central-1
# We only register/update storage in one preview console and manually copy to other instances
console_mgmt_base_url: http://neon-internal-api.helium.aws.neon.build
broker_endpoint: http://storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.helium.aws.neon.build/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: eu-central-1
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-central-1
console_region_id: aws-eu-central-1
sentry_environment: staging
children:
pageservers:
hosts:
pageserver-0.eu-central-1.aws.neon.build:
ansible_host: i-011f93ec26cfba2d4
safekeepers:
hosts:
safekeeper-0.eu-central-1.aws.neon.build:
ansible_host: i-0ff026d27babf8ddd
safekeeper-1.eu-central-1.aws.neon.build:
ansible_host: i-03983a49ee54725d9
safekeeper-2.eu-central-1.aws.neon.build:
ansible_host: i-0bd025ecdb61b0db3

View File

@@ -48,8 +48,6 @@ storage:
hosts:
safekeeper-0.us-east-2.aws.neon.build:
ansible_host: i-027662bd552bf5db0
safekeeper-1.us-east-2.aws.neon.build:
ansible_host: i-0171efc3604a7b907
safekeeper-2.us-east-2.aws.neon.build:
ansible_host: i-0de0b03a51676a6ce
safekeeper-99.us-east-2.aws.neon.build:

View File

@@ -0,0 +1,52 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: staging
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "staging"

View File

@@ -23,6 +23,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.eu-west-1.aws.neon.build"
otelExporterOtlpEndpoint: "https://otel-collector.zeta.eu-west-1.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -9,6 +9,7 @@ settings:
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
uri: "https://console.stage.neon.tech/psql_session/"
domain: "pg.neon.build"
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"

View File

@@ -24,6 +24,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.cloud.stage.neon.tech"
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -25,6 +25,7 @@ settings:
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
extraDomains: ["*.us-east-2.postgres.zenith.tech", "*.us-east-2.retooldb-staging.com"]
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -0,0 +1,67 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/management/api/v2"
domain: "*.cloud.${PREVIEW_NAME}.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: test
neon_region: ${PREVIEW_NAME}.eu-central-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: cloud.${PREVIEW_NAME}.aws.neon.build
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -23,8 +23,8 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-1.aws.neon.tech"
# These domains haven't been delegated yet.
# extraDomains: ["*.us-east-1.retooldb.com", "*.us-east-1.postgres.vercel-storage.com"]
# *.us-east-1.retooldb.com hasn't been delegated yet.
extraDomains: ["*.us-east-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -541,7 +541,7 @@ jobs:
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
needs: [ push-docker-hub, tag ]
needs: [ promote-images, tag ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
@@ -584,8 +584,7 @@ jobs:
neon-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
# https://github.com/GoogleContainerTools/kaniko/issues/2005
container: gcr.io/kaniko-project/executor:v1.7.0-debug
container: gcr.io/kaniko-project/executor:v1.9.2-debug
defaults:
run:
shell: sh -eu {0}
@@ -597,11 +596,32 @@ jobs:
submodules: true
fetch-depth: 0
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Kaniko build neon
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
@@ -652,7 +672,7 @@ jobs:
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
container: gcr.io/kaniko-project/executor:v1.7.0-debug
container: gcr.io/kaniko-project/executor:v1.9.2-debug
defaults:
run:
shell: sh -eu {0}
@@ -661,18 +681,41 @@ jobs:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Kaniko build compute tools
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--dockerfile Dockerfile.compute-tools
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
compute-node-image:
runs-on: [ self-hosted, gen3, large ]
container: gcr.io/kaniko-project/executor:v1.7.0-debug
container: gcr.io/kaniko-project/executor:v1.9.2-debug
needs: [ tag ]
strategy:
fail-fast: false
@@ -689,12 +732,36 @@ jobs:
submodules: true
fetch-depth: 0
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Kaniko build compute node with extensions
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --build-arg PG_VERSION=${{ matrix.version }} --dockerfile Dockerfile.compute-node --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg PG_VERSION=${{ matrix.version }}
--dockerfile Dockerfile.compute-node
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
@@ -786,13 +853,11 @@ jobs:
runs-on: [ self-hosted, gen3, small ]
needs: [ tag, test-images, vm-compute-node-image ]
container: golang:1.19-bullseye
if: github.event_name != 'workflow_dispatch'
# Don't add if-condition here.
# The job should always be run because we have dependant other jobs that shouldn't be skipped
steps:
- name: Install Crane & ECR helper
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
@@ -802,10 +867,15 @@ jobs:
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Copy vm-compute-node images to Docker Hub
run: |
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
- name: Add latest tag to images
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
github.event_name != 'workflow_dispatch'
run: |
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
@@ -814,50 +884,10 @@ jobs:
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
push-docker-hub:
runs-on: [ self-hosted, dev, x64 ]
needs: [ promote-images, tag ]
container: golang:1.19-bullseye
steps:
- name: Install Crane & ECR helper
run: |
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
- name: Configure ECR login
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Pull neon image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} neon
- name: Pull compute tools image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} compute-tools
- name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} compute-node-v14
- name: Pull vm compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} compute-node-v15
- name: Pull vm compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
- name: Push images to production ECR
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
github.event_name != 'workflow_dispatch'
run: |
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
@@ -872,28 +902,12 @@ jobs:
echo "" > /github/home/.docker/config.json
crane auth login -u ${{ secrets.NEON_DOCKERHUB_USERNAME }} -p ${{ secrets.NEON_DOCKERHUB_PASSWORD }} index.docker.io
- name: Push neon image to Docker Hub
run: crane push neon neondatabase/neon:${{needs.tag.outputs.build-tag}}
- name: Push vm-compute-node to Docker Hub
run: |
crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push compute tools image to Docker Hub
run: crane push compute-tools neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
- name: Push compute node v14 image to Docker Hub
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push vm compute node v14 image to Docker Hub
run: crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push compute node v15 image to Docker Hub
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push vm compute node v15 image to Docker Hub
run: crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned
- name: Add latest tag to images in Docker Hub
- name: Push latest tags to Docker Hub
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
@@ -913,7 +927,7 @@ jobs:
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, tag, regress-tests ]
needs: [ promote-images, tag, regress-tests ]
if: |
contains(github.event.pull_request.labels.*.name, 'deploy-test-storage') &&
github.event_name != 'workflow_dispatch'
@@ -947,7 +961,7 @@ jobs:
deploy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
needs: [ push-docker-hub, tag, regress-tests ]
needs: [ promote-images, tag, regress-tests ]
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
steps:
- name: Fix git ownership
@@ -984,7 +998,7 @@ jobs:
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ push-docker-hub, tag, regress-tests ]
needs: [ promote-images, tag, regress-tests ]
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
steps:
- name: Promote compatibility snapshot for the release

View File

@@ -48,7 +48,8 @@ jobs:
shell: bash
strategy:
matrix:
target_region: [ eu-west-1, us-east-2 ]
# TODO(sergey): Fix storage deploy in eu-central-1
target_region: [ eu-west-1, us-east-2]
environment:
name: dev-${{ matrix.target_region }}
steps:
@@ -133,6 +134,53 @@ jobs:
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-preview-proxy-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployProxy
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy preview proxies
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
for PREVIEW_NAME in helium argon krypton xenon radon oganesson hydrogen nitrogen oxygen fluorine chlorine; do
export PREVIEW_NAME
envsubst <.github/helm-values/preview-template.neon-proxy-scram.yaml >preview-${PREVIEW_NAME}.neon-proxy-scram.yaml
helm upgrade neon-proxy-scram-${PREVIEW_NAME} neondatabase/neon-proxy --namespace neon-proxy-${PREVIEW_NAME} --create-namespace --install --atomic -f preview-${PREVIEW_NAME}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
done
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker-new:
runs-on: [ self-hosted, gen3, small ]
@@ -148,6 +196,8 @@ jobs:
target_cluster: dev-us-east-2-beta
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:

11
Cargo.lock generated
View File

@@ -1105,7 +1105,6 @@ dependencies = [
"anyhow",
"clap 4.2.2",
"comfy-table",
"file-lock",
"git-version",
"nix",
"once_cell",
@@ -1551,16 +1550,6 @@ dependencies = [
"instant",
]
[[package]]
name = "file-lock"
version = "2.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f59be9010c5418713a48aac4c1b897d85dafd958055683dc31bdae553536647b"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "filetime"
version = "0.2.21"

View File

@@ -42,7 +42,6 @@ either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
file-lock = "2.1.9"
fs2 = "0.4.3"
futures = "0.3"
futures-core = "0.3"

View File

@@ -73,7 +73,7 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let mut spec = None;
let spec;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
@@ -89,9 +89,13 @@ fn main() -> Result<()> {
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
live_config_allowed = true;
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = Some(s);
}
spec = match get_spec_from_control_plane(cp_base, id) {
Ok(s) => s,
Err(e) => {
error!("cannot get response from control plane: {}", e);
panic!("neither spec nor confirmation that compute is in the Empty state was received");
}
};
} else {
panic!("must specify both --control-plane-uri and --compute-id or none");
}
@@ -114,7 +118,6 @@ fn main() -> Result<()> {
spec_set = false;
}
let compute_node = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
@@ -147,6 +150,17 @@ fn main() -> Result<()> {
let mut state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().expect("spec must be set");
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
// Record for how long we slept waiting for the spec.
state.metrics.wait_for_spec_ms = Utc::now()
.signed_duration_since(state.start_time)
.to_std()
.unwrap()
.as_millis() as u64;
// Reset start time to the actual start of the configuration, so that
// total startup time was properly measured at the end.
state.start_time = Utc::now();
state.status = ComputeStatus::Init;
compute.state_changed.notify_all();
drop(state);

View File

@@ -38,7 +38,6 @@ use crate::spec::*;
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
pub start_time: DateTime<Utc>,
// Url type maintains proper escaping
pub connstr: url::Url,
pub pgdata: String,
@@ -66,6 +65,7 @@ pub struct ComputeNode {
#[derive(Clone, Debug)]
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
@@ -77,6 +77,7 @@ pub struct ComputeState {
impl ComputeState {
pub fn new() -> Self {
Self {
start_time: Utc::now(),
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
@@ -425,7 +426,7 @@ impl ComputeNode {
.unwrap()
.as_millis() as u64;
state.metrics.total_startup_ms = startup_end_time
.signed_duration_since(self.start_time)
.signed_duration_since(compute_state.start_time)
.to_std()
.unwrap()
.as_millis() as u64;

View File

@@ -18,6 +18,7 @@ use tracing_utils::http::OtelName;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
start_time: state.start_time,
tenant: state
.pspec
.as_ref()

View File

@@ -152,11 +152,14 @@ components:
type: object
description: Compute startup metrics.
required:
- wait_for_spec_ms
- sync_safekeepers_ms
- basebackup_ms
- config_ms
- total_startup_ms
properties:
wait_for_spec_ms:
type: integer
sync_safekeepers_ms:
type: integer
basebackup_ms:
@@ -181,6 +184,13 @@ components:
- status
- last_active
properties:
start_time:
type: string
description: |
Time when compute was started. If initially compute was started in the `empty`
state and then provided with valid spec, `start_time` will be reset to the
moment, when spec was received.
example: "2022-10-12T07:20:50.52Z"
status:
$ref: '#/components/schemas/ComputeStatus'
last_active:

View File

@@ -4,42 +4,117 @@ use std::str::FromStr;
use anyhow::{anyhow, bail, Result};
use postgres::config::Config;
use postgres::{Client, NoTls};
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
use reqwest::StatusCode;
use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::responses::ControlPlaneSpecResponse;
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
// Do control plane request and return response if any. In case of error it
// returns a bool flag indicating whether it makes sense to retry the request
// and a string with error message.
fn do_control_plane_request(
uri: &str,
jwt: &str,
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", jwt)
.send()
.map_err(|e| {
(
true,
format!("could not perform spec request to control plane: {}", e),
)
})?;
match resp.status() {
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
format!("could not deserialize control plane response: {}", e),
)),
},
StatusCode::SERVICE_UNAVAILABLE => {
Err((true, "control plane is temporarily unavailable".to_string()))
}
StatusCode::BAD_GATEWAY => {
// We have a problem with intermittent 502 errors now
// https://github.com/neondatabase/cloud/issues/2353
// It's fine to retry GET request in this case.
Err((true, "control plane request failed with 502".to_string()))
}
// Another code, likely 500 or 404, means that compute is unknown to the control plane
// or some internal failure happened. Doesn't make much sense to retry in this case.
_ => Err((
false,
format!(
"unexpected control plane response status code: {}",
resp.status()
),
)),
}
}
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
pub fn get_spec_from_control_plane(
base_uri: &str,
compute_id: &str,
) -> Result<Option<ComputeSpec>> {
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
Ok(v) => v,
Err(_) => "".to_string(),
};
let mut attempt = 1;
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
info!("getting spec from control plane: {}", cp_uri);
// TODO: check the response. We should distinguish cases when it's
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
.json()
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
// Do 3 attempts to get spec from the control plane using the following logic:
// - network error -> then retry
// - compute id is unknown or any other error -> bail out
// - no spec for compute yet (Empty state) -> return Ok(None)
// - got spec -> return Ok(Some(spec))
while attempt < 4 {
spec = match do_control_plane_request(&cp_uri, &jwt) {
Ok(spec_resp) => match spec_resp.status {
ControlPlaneComputeStatus::Empty => Ok(None),
ControlPlaneComputeStatus::Attached => {
if let Some(spec) = spec_resp.spec {
Ok(Some(spec))
} else {
bail!("compute is attached, but spec is empty")
}
}
},
Err((retry, msg)) => {
if retry {
Err(anyhow!(msg))
} else {
bail!(msg);
}
}
};
if let Some(spec) = resp.spec {
Ok(spec)
} else {
bail!("could not get compute spec from control plane")
if let Err(e) = &spec {
error!("attempt {} to get spec failed with: {}", attempt, e);
} else {
return spec;
}
attempt += 1;
std::thread::sleep(std::time::Duration::from_millis(100));
}
// All attempts failed, return error.
spec
}
/// It takes cluster specification and does the following:

View File

@@ -8,7 +8,6 @@ license.workspace = true
anyhow.workspace = true
clap.workspace = true
comfy-table.workspace = true
file-lock.workspace = true
git-version.workspace = true
nix.workspace = true
once_cell.workspace = true

View File

@@ -365,7 +365,11 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(DEFAULT_BRANCH_NAME, new_tenant_id, new_timeline_id)?;
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
new_tenant_id,
new_timeline_id,
)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
@@ -407,7 +411,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
Some(("list", list_match)) => {
let tenant_id = get_tenant_id(list_match, env)?;
let timelines = pageserver.timeline_list(&tenant_id)?;
print_timelines_tree(timelines, env.timeline_name_mappings()?)?;
print_timelines_tree(timelines, env.timeline_name_mappings())?;
}
Some(("create", create_match)) => {
let tenant_id = get_tenant_id(create_match, env)?;
@@ -425,7 +429,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
@@ -464,10 +468,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.copied()
.context("Failed to parse postgres version from the argument string")?;
let mut cplane = ComputeControlPlane::new(env.clone());
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
env.register_branch_mapping(name, tenant_id, timeline_id)?;
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
@@ -483,7 +487,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let ancestor_timeline_id = env
.get_branch_timeline_id(ancestor_branch_name, tenant_id)?
.get_branch_timeline_id(ancestor_branch_name, tenant_id)
.ok_or_else(|| {
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
})?;
@@ -504,7 +508,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
@@ -524,7 +528,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
None => bail!("no endpoint subcommand provided"),
};
let mut cplane = ComputeControlPlane::new(env.clone());
let mut cplane = ComputeControlPlane::load(env.clone())?;
// All subcommands take an optional --tenant-id option
let tenant_id = get_tenant_id(sub_args, env)?;
@@ -536,7 +540,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
HashMap::new()
});
let timeline_name_mappings = env.timeline_name_mappings()?;
let timeline_name_mappings = env.timeline_name_mappings();
let mut table = comfy_table::Table::new();
@@ -551,7 +555,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
"STATUS",
]);
for (endpoint_id, endpoint) in ComputeControlPlane::load_endpoints(env)?
for (endpoint_id, endpoint) in cplane
.endpoints
.iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
@@ -604,7 +609,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.transpose()
.context("Failed to parse Lsn from the request")?;
let timeline_id = env
.get_branch_timeline_id(branch_name, tenant_id)?
.get_branch_timeline_id(branch_name, tenant_id)
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -622,7 +627,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?;
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
@@ -641,7 +646,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let timeline_id = env
.get_branch_timeline_id(branch_name, tenant_id)?
.get_branch_timeline_id(branch_name, tenant_id)
.ok_or_else(|| {
anyhow!("Found no timeline id for branch name '{branch_name}'")
})?;
@@ -678,7 +683,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
let destroy = sub_args.get_flag("destroy");
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(destroy)?;
}
@@ -837,9 +844,9 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let pageserver = PageServerNode::from_env(env);
// Stop all endpoints
match ComputeControlPlane::load_endpoints(env) {
Ok(endpoints) => {
for (_k, node) in endpoints {
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.endpoints {
if let Err(e) = node.stop(false) {
eprintln!("postgres stop failed: {e:#}");
}

View File

@@ -11,104 +11,56 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::local_env::LocalEnv;
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
use crate::pageserver::PageServerNode;
use crate::postgresql_conf::PostgresConf;
// contents of a endpoint.json file
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
name: String,
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
timeline_id: TimelineId,
#[serde_as(as = "Option<DisplayFromStr>")]
lsn: Option<Lsn>,
port: u16,
pg_version: u32,
}
//
// ComputeControlPlane
//
pub struct ComputeControlPlane {
base_port: u16,
// endpoint ID is the key
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
env: LocalEnv,
pageserver: Arc<PageServerNode>,
}
impl ComputeControlPlane {
pub fn new(env: LocalEnv) -> Self {
let pageserver = Arc::new(PageServerNode::from_env(&env));
ComputeControlPlane {
base_port: 55431,
env,
pageserver,
}
}
// Load current endpoints from the endpoints/ subdirectories
//
// endpoint ID is the key in the returned BTreeMap.
//
// NOTE: This is not concurrency-safe, and can fail if another 'neon_local'
// invocation is creating or deleting an endpoint at the same time.
pub fn load_endpoints(env: &LocalEnv) -> Result<BTreeMap<String, Arc<Endpoint>>> {
let pageserver = Arc::new(PageServerNode::from_env(env));
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
let pageserver = Arc::new(PageServerNode::from_env(&env));
let mut endpoints = BTreeMap::default();
for endpoint_dir in fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let ep = Endpoint::from_dir_entry(endpoint_dir?, env, &pageserver)?;
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
endpoints.insert(ep.name.clone(), Arc::new(ep));
}
Ok(endpoints)
}
// Load an endpoint from the endpoints/ subdirectories
pub fn load_endpoint(name: &str, env: &LocalEnv) -> Result<Option<Endpoint>> {
let endpoint_json_path = env.endpoints_path().join(name).join("endpoint.json");
if !endpoint_json_path.exists() {
return Ok(None);
}
// Read the endpoint.json file
let conf: EndpointConf = serde_json::from_slice(&std::fs::read(endpoint_json_path)?)?;
// ok now
let pageserver = Arc::new(PageServerNode::from_env(env));
Ok(Some(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
name: name.to_string(),
env: env.clone(),
Ok(ComputeControlPlane {
base_port: 55431,
endpoints,
env,
pageserver,
timeline_id: conf.timeline_id,
lsn: conf.lsn,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
}))
})
}
fn get_port(&self) -> anyhow::Result<u16> {
let endpoints = ComputeControlPlane::load_endpoints(&self.env)?;
let next_port = 1 + endpoints
fn get_port(&mut self) -> u16 {
1 + self
.endpoints
.values()
.map(|ep| ep.address.port())
.max()
.unwrap_or(self.base_port);
Ok(next_port)
.unwrap_or(self.base_port)
}
pub fn new_endpoint(
@@ -120,13 +72,7 @@ impl ComputeControlPlane {
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<Endpoint>> {
// NOTE: Unlike most of neon_local, 'new_endpoint' is safe to run from
// two 'neon_local' invocations at the same time, IF the port is specified
// explicitly. (get_port() is racy)
let port = match port {
Some(port) => port,
None => self.get_port()?,
};
let port = port.unwrap_or_else(|| self.get_port());
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
@@ -137,20 +83,12 @@ impl ComputeControlPlane {
tenant_id,
pg_version,
});
ep.create_pgdata()?;
std::fs::write(
ep.endpoint_path().join("endpoint.json"),
serde_json::to_string_pretty(&EndpointConf {
name: name.to_string(),
tenant_id,
timeline_id,
lsn,
port,
pg_version,
})?,
)?;
ep.setup_pg_conf()?;
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
Ok(ep)
}
}
@@ -193,20 +131,42 @@ impl Endpoint {
let fname = entry.file_name();
let name = fname.to_str().unwrap().to_string();
// Read the endpoint.json file
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
// Read config file into memory
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
let cfg_path_str = cfg_path.to_string_lossy();
let mut conf_file = File::open(&cfg_path)
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
let conf = PostgresConf::read(&mut conf_file)
.with_context(|| format!("failed to read config file in {}", cfg_path_str))?;
// Read a few options from the config file
let context = format!("in config file {}", cfg_path_str);
let port: u16 = conf.parse_field("port", &context)?;
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
// If it doesn't exist, assume broken data directory and use default pg version.
let pg_version_path = entry.path().join("PG_VERSION");
let pg_version_str =
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?;
// ok now
Ok(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
name,
env: env.clone(),
pageserver: Arc::clone(pageserver),
timeline_id: conf.timeline_id,
lsn: conf.lsn,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
timeline_id,
lsn: recovery_target_lsn,
tenant_id,
pg_version,
})
}

View File

@@ -5,7 +5,6 @@
use anyhow::{bail, ensure, Context};
use file_lock::{FileLock, FileOptions};
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
@@ -13,14 +12,11 @@ use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::Seek;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::str::FromStr;
use utils::{
auth::{encode_from_key_file, Claims},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -76,84 +72,14 @@ pub struct LocalEnv {
#[serde(default)]
pub safekeepers: Vec<SafekeeperConf>,
}
// Keep human-readable aliases in memory (and persist them to
// 'branch_name_mappings.json'), to hide ZId hex strings from the user.
//
// BranchNameMappingsSerialized corresponds to the actual JSON format of
// 'branch_name_mappings.json' file. It's a bit more awkward to work with, so we convert
// it to/from BranchNameMappings when reading/writing the file.
type BranchNameMappings = HashMap<(TenantId, String), TimelineId>;
type BranchNameMappingsSerialized = HashMap<String, HashMap<String, String>>;
pub struct BranchNameMappingsLock {
mappings: BranchNameMappings,
lock: FileLock,
}
impl Deref for BranchNameMappingsLock {
type Target = HashMap<(TenantId, String), TimelineId>;
fn deref(&self) -> &Self::Target {
&self.mappings
}
}
impl DerefMut for BranchNameMappingsLock {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.mappings
}
}
impl BranchNameMappingsLock {
/// Write the modified branch-name mapppings back to 'branch_name_mappings.json',
/// and release the lock.
fn write_to_file(mut self) -> anyhow::Result<()> {
let mut serialized_mappings: BranchNameMappingsSerialized = HashMap::new();
for ((tenant_id, branch_name), timeline_id) in self.iter() {
serialized_mappings
.entry(tenant_id.to_string())
.or_default()
.insert(branch_name.clone(), timeline_id.to_string());
}
self.lock.file.set_len(0)?;
self.lock.file.rewind()?;
serde_json::to_writer_pretty(&self.lock.file, &serialized_mappings)?;
Ok(())
}
}
/// Get the branch-name mappings.
///
/// This returns a guard object that holds a lock on the branch_name_mappings.json
/// file. That makes it safe for two 'neon_local' invocations to read/manipulate
/// branch name mappings at the same time.
pub fn load_branch_name_mappings() -> anyhow::Result<BranchNameMappingsLock> {
let path = base_path().join("branch_name_mappings.json");
let lock = FileLock::lock(
path,
true,
FileOptions::new().create(true).read(true).write(true),
)?;
let mut mappings = BranchNameMappings::new();
if lock.file.metadata()?.len() > 0 {
let serialized_mappings: BranchNameMappingsSerialized = serde_json::from_reader(&lock.file)
.context("Failed to read branch_name_mappings.json")?;
for (tenant_str, map) in serialized_mappings.iter() {
for (branch_name, timeline_str) in map.iter() {
mappings.insert(
(TenantId::from_str(tenant_str)?, branch_name.to_string()),
TimelineId::from_str(timeline_str)?,
);
}
}
}
Ok(BranchNameMappingsLock { mappings, lock })
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
#[serde(default)]
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
#[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
}
/// Broker config for cluster internal communication.
@@ -289,21 +215,27 @@ impl LocalEnv {
pub fn register_branch_mapping(
&mut self,
branch_name: &str,
branch_name: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
let mut mappings = load_branch_name_mappings()?;
let existing_values = self
.branch_name_mappings
.entry(branch_name.clone())
.or_default();
if let Some(old_timeline_id) = mappings.get(&(tenant_id, branch_name.to_string())) {
let existing_ids = existing_values
.iter()
.find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
if let Some((_, old_timeline_id)) = existing_ids {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
}
} else {
mappings.insert((tenant_id, branch_name.to_string()), timeline_id);
mappings.write_to_file()?;
existing_values.push((tenant_id, timeline_id));
Ok(())
}
}
@@ -312,22 +244,24 @@ impl LocalEnv {
&self,
branch_name: &str,
tenant_id: TenantId,
) -> anyhow::Result<Option<TimelineId>> {
let mappings = load_branch_name_mappings()?;
Ok(mappings.get(&(tenant_id, branch_name.to_string())).copied())
) -> Option<TimelineId> {
self.branch_name_mappings
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
.map(|&(_, timeline_id)| timeline_id)
.map(TimelineId::from)
}
pub fn timeline_name_mappings(&self) -> anyhow::Result<HashMap<TenantTimelineId, String>> {
let mappings = load_branch_name_mappings()?;
Ok(mappings
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
self.branch_name_mappings
.iter()
.map(|((tenant_id, branch_name), timeline_id)| {
(
TenantTimelineId::new(*tenant_id, *timeline_id),
branch_name.clone(),
)
.flat_map(|(name, tenant_timelines)| {
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
(TenantTimelineId::new(tenant_id, timeline_id), name.clone())
})
})
.collect())
.collect()
}
/// Create a LocalEnv from a config file.

View File

@@ -359,8 +359,8 @@ impl PageServerNode {
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.get("eviction_policy")
.map(|x| serde_json::from_str(x))
.remove("eviction_policy")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
min_resident_size_override: settings

View File

@@ -14,6 +14,7 @@ pub struct GenericAPIError {
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct ComputeStatusResponse {
pub start_time: DateTime<Utc>,
pub tenant: Option<String>,
pub timeline: Option<String>,
pub status: ComputeStatus,
@@ -63,6 +64,7 @@ where
/// Response of the /metrics.json API
#[derive(Clone, Debug, Default, Serialize)]
pub struct ComputeMetrics {
pub wait_for_spec_ms: u64,
pub sync_safekeepers_ms: u64,
pub basebackup_ms: u64,
pub config_ms: u64,
@@ -75,4 +77,16 @@ pub struct ComputeMetrics {
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {
pub spec: Option<ComputeSpec>,
pub status: ControlPlaneComputeStatus,
}
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ControlPlaneComputeStatus {
// Compute is known to control-plane, but it's not
// yet attached to any timeline / endpoint.
Empty,
// Compute is attached to some timeline / endpoint and
// should be able to start with provided spec.
Attached,
}

View File

@@ -76,6 +76,7 @@ where
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
@@ -87,7 +88,11 @@ where
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
//
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
match (self.0)(request).await {
let res = (self.0)(request).await;
cancellation_guard.disarm();
match res {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
@@ -105,6 +110,38 @@ where
}
}
/// Drop guard to WARN in case the request was dropped before completion.
struct RequestCancelled {
warn: Option<tracing::Span>,
}
impl RequestCancelled {
/// Create the drop guard using the [`tracing::Span::current`] as the span.
fn warn_when_dropped_without_responding() -> Self {
RequestCancelled {
warn: Some(tracing::Span::current()),
}
}
/// Consume the drop guard without logging anything.
fn disarm(mut self) {
self.warn = None;
}
}
impl Drop for RequestCancelled {
fn drop(&mut self) {
if let Some(span) = self.warn.take() {
// the span has all of the info already, but the outer `.instrument(span)` has already
// been dropped, so we need to manually re-enter it for this message.
//
// this is what the instrument would do before polling so it is fine.
let _g = span.entered();
warn!("request was dropped before completing");
}
}
}
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
SERVE_METRICS_COUNT.inc();

View File

@@ -1,6 +1,7 @@
use std::str::FromStr;
use anyhow::Context;
use once_cell::sync::Lazy;
use strum_macros::{EnumString, EnumVariantNames};
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
@@ -23,25 +24,64 @@ impl LogFormat {
}
}
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
let default_filter_str = "info";
static TRACING_EVENT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
metrics::register_int_counter_vec!(
"libmetrics_tracing_event_count",
"Number of tracing events, by level",
&["level"]
)
.expect("failed to define metric")
});
struct TracingEventCountLayer(&'static metrics::IntCounterVec);
impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
where
S: tracing::Subscriber,
{
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let level = event.metadata().level();
let level = match *level {
tracing::Level::ERROR => "error",
tracing::Level::WARN => "warn",
tracing::Level::INFO => "info",
tracing::Level::DEBUG => "debug",
tracing::Level::TRACE => "trace",
};
self.0.with_label_values(&[level]).inc();
}
}
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
let rust_log_env_filter = || {
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
let base_logger = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
match log_format {
LogFormat::Json => base_logger.json().init(),
LogFormat::Plain => base_logger.init(),
LogFormat::Test => base_logger.with_test_writer().init(),
}
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
})
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
.init();
Ok(())
}
@@ -157,3 +197,33 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
<Self as std::fmt::Display>::fmt(self, f)
}
}
#[cfg(test)]
mod tests {
use metrics::{core::Opts, IntCounterVec};
use super::TracingEventCountLayer;
#[test]
fn tracing_event_count_metric() {
let counter_vec =
IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static
let layer = TracingEventCountLayer(counter_vec);
use tracing_subscriber::prelude::*;
tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
tracing::trace!("foo");
tracing::debug!("foo");
tracing::info!("foo");
tracing::warn!("foo");
tracing::error!("foo");
});
assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
}
}

View File

@@ -520,6 +520,43 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/synthetic_size:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
get:
description: |
Calculate tenant's synthetic size
responses:
"200":
description: Tenant's synthetic size
content:
application/json:
schema:
$ref: "#/components/schemas/SyntheticSizeResponse"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/size:
parameters:
- name: tenant_id
@@ -948,6 +985,84 @@ components:
latest_gc_cutoff_lsn:
type: string
format: hex
SyntheticSizeResponse:
type: object
required:
- id
- size
- segment_sizes
- inputs
properties:
id:
type: string
format: hex
size:
type: integer
segment_sizes:
type: array
items:
$ref: "#/components/schemas/SegmentSize"
inputs:
type: object
properties:
segments:
type: array
items:
$ref: "#/components/schemas/SegmentData"
timeline_inputs:
type: array
items:
$ref: "#/components/schemas/TimelineInput"
SegmentSize:
type: object
required:
- method
- accum_size
properties:
method:
type: string
accum_size:
type: integer
SegmentData:
type: object
required:
- segment
properties:
segment:
type: object
required:
- lsn
properties:
parent:
type: integer
lsn:
type: integer
size:
type: integer
needed:
type: boolean
timeline_id:
type: string
format: hex
kind:
type: string
TimelineInput:
type: object
required:
- timeline_id
properties:
ancestor_id:
type: string
ancestor_lsn:
type: string
timeline_id:
type: string
format: hex
Error:
type: object
required:

View File

@@ -1201,6 +1201,37 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
)
}
#[cfg(feature = "testing")]
async fn post_tracing_event_handler(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
enum Level {
Error,
Warn,
Info,
Debug,
Trace,
}
#[derive(Debug, serde::Deserialize)]
struct Request {
level: Level,
message: String,
}
let body: Request = json_request(&mut r)
.await
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
match body.level {
Level::Error => tracing::error!(?body.message),
Level::Warn => tracing::warn!(?body.message),
Level::Info => tracing::info!(?body.message),
Level::Debug => tracing::debug!(?body.message),
Level::Trace => tracing::trace!(?body.message),
}
json_response(StatusCode::OK, ())
}
pub fn make_router(
conf: &'static PageServerConf,
launch_ts: &'static LaunchTimestamp,
@@ -1341,5 +1372,9 @@ pub fn make_router(
testing_api!("set tenant state to broken", handle_tenant_break),
)
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
.post(
"/v1/tracing/event",
testing_api!("emit a tracing event", post_tracing_event_handler),
)
.any(handler_404))
}

View File

@@ -1,9 +1,9 @@
use metrics::core::{AtomicU64, GenericCounter};
use metrics::{
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
Counter, CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
UIntGauge, UIntGaugeVec,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec, Counter, CounterVec,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::models::TenantState;
@@ -139,15 +139,6 @@ pub static REMOTE_ONDEMAND_DOWNLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});
pub static LOAD_LAYER_MAP_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_load_layer_map_histogram",
"Time spent on loadiing layer map",
STORAGE_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_current_logical_size",
@@ -359,11 +350,6 @@ pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub static NUM_ONDISK_LAYERS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
.expect("failed to define a metric")
});
// remote storage metrics
/// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`].
@@ -394,6 +380,26 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_timeline_client_bytes_started",
"Incremented by the number of bytes associated with a remote timeline client operation. \
The increment happens when the operation is scheduled.",
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
)
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_timeline_client_bytes_finished",
"Incremented by the number of bytes associated with a remote timeline client operation. \
The increment happens when the operation finishes (regardless of success/failure/shutdown).",
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
)
.expect("failed to define a metric")
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,
@@ -561,7 +567,7 @@ impl StorageTimeMetricsTimer {
pub struct StorageTimeMetrics {
/// Sum of f64 seconds, per operation, tenant_id and timeline_id
timeline_sum: Counter,
/// Number of operations, per operation, tenant_id and timeline_id
/// Number of oeprations, per operation, tenant_id and timeline_id
timeline_count: IntCounter,
/// Global histogram having only the "operation" label.
global_histogram: Histogram,
@@ -604,6 +610,7 @@ pub struct TimelineMetrics {
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
pub wait_lsn_time_histo: Histogram,
@@ -635,6 +642,8 @@ impl TimelineMetrics {
let create_images_time_histo =
StorageTimeMetrics::new("create images", &tenant_id, &timeline_id);
let logical_size_histo = StorageTimeMetrics::new("logical size", &tenant_id, &timeline_id);
let load_layer_map_histo =
StorageTimeMetrics::new("load layer map", &tenant_id, &timeline_id);
let garbage_collect_histo = StorageTimeMetrics::new("gc", &tenant_id, &timeline_id);
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
@@ -670,6 +679,7 @@ impl TimelineMetrics {
create_images_time_histo,
logical_size_histo,
garbage_collect_histo,
load_layer_map_histo,
last_record_gauge,
wait_lsn_time_histo,
resident_physical_size_gauge,
@@ -744,6 +754,8 @@ pub struct RemoteTimelineClientMetrics {
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
}
impl RemoteTimelineClientMetrics {
@@ -754,6 +766,8 @@ impl RemoteTimelineClientMetrics {
remote_operation_time: Mutex::new(HashMap::default()),
calls_unfinished_gauge: Mutex::new(HashMap::default()),
calls_started_hist: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge: Mutex::new(None),
}
}
@@ -792,6 +806,7 @@ impl RemoteTimelineClientMetrics {
});
metric.clone()
}
fn calls_unfinished_gauge(
&self,
file_kind: &RemoteOpFileKind,
@@ -833,32 +848,125 @@ impl RemoteTimelineClientMetrics {
});
metric.clone()
}
fn bytes_started_counter(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntCounter {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.bytes_started_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
])
.unwrap()
});
metric.clone()
}
fn bytes_finished_counter(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntCounter {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.bytes_finished_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
])
.unwrap()
});
metric.clone()
}
}
#[cfg(test)]
impl RemoteTimelineClientMetrics {
pub fn get_bytes_started_counter_value(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Option<u64> {
let guard = self.bytes_started_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
guard.get(&key).map(|counter| counter.get())
}
pub fn get_bytes_finished_counter_value(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Option<u64> {
let guard = self.bytes_finished_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
guard.get(&key).map(|counter| counter.get())
}
}
/// See [`RemoteTimelineClientMetrics::call_begin`].
#[must_use]
pub(crate) struct RemoteTimelineClientCallMetricGuard(Option<IntGauge>);
pub(crate) struct RemoteTimelineClientCallMetricGuard {
/// Decremented on drop.
calls_unfinished_metric: Option<IntGauge>,
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
bytes_finished: Option<(IntCounter, u64)>,
}
impl RemoteTimelineClientCallMetricGuard {
/// Consume this guard object without decrementing the metric.
/// The caller vouches to do this manually, so that the prior increment of the gauge will cancel out.
/// Consume this guard object without performing the metric updates it would do on `drop()`.
/// The caller vouches to do the metric updates manually.
pub fn will_decrement_manually(mut self) {
self.0 = None; // prevent drop() from decrementing
let RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric,
bytes_finished,
} = &mut self;
calls_unfinished_metric.take();
bytes_finished.take();
}
}
impl Drop for RemoteTimelineClientCallMetricGuard {
fn drop(&mut self) {
if let RemoteTimelineClientCallMetricGuard(Some(guard)) = self {
let RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric,
bytes_finished,
} = self;
if let Some(guard) = calls_unfinished_metric.take() {
guard.dec();
}
if let Some((bytes_finished_metric, value)) = bytes_finished {
bytes_finished_metric.inc_by(*value);
}
}
}
/// The enum variants communicate to the [`RemoteTimelineClientMetrics`] whether to
/// track the byte size of this call in applicable metric(s).
pub(crate) enum RemoteTimelineClientMetricsCallTrackSize {
/// Do not account for this call's byte size in any metrics.
/// The `reason` field is there to make the call sites self-documenting
/// about why they don't need the metric.
DontTrackSize { reason: &'static str },
/// Track the byte size of the call in applicable metric(s).
Bytes(u64),
}
impl RemoteTimelineClientMetrics {
/// Increment the metrics that track ongoing calls to the remote timeline client instance.
/// Update the metrics that change when a call to the remote timeline client instance starts.
///
/// Drop the returned guard object once the operation is finished to decrement the values.
/// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions.
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that
/// is more suitable.
/// Never do both.
@@ -866,24 +974,51 @@ impl RemoteTimelineClientMetrics {
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
size: RemoteTimelineClientMetricsCallTrackSize,
) -> RemoteTimelineClientCallMetricGuard {
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
self.calls_started_hist(file_kind, op_kind)
.observe(unfinished_metric.get() as f64);
unfinished_metric.inc();
RemoteTimelineClientCallMetricGuard(Some(unfinished_metric))
.observe(calls_unfinished_metric.get() as f64);
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
let bytes_finished = match size {
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
// nothing to do
None
}
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
self.bytes_started_counter(file_kind, op_kind).inc_by(size);
let finished_counter = self.bytes_finished_counter(file_kind, op_kind);
Some((finished_counter, size))
}
};
RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric: Some(calls_unfinished_metric),
bytes_finished,
}
}
/// Manually decrement the metric instead of using the guard object.
/// Manually udpate the metrics that track completions, instead of using the guard object.
/// Using the guard object is generally preferable.
/// See [`call_begin`] for more context.
pub(crate) fn call_end(&self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind) {
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
pub(crate) fn call_end(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
size: RemoteTimelineClientMetricsCallTrackSize,
) {
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
debug_assert!(
unfinished_metric.get() > 0,
calls_unfinished_metric.get() > 0,
"begin and end should cancel out"
);
unfinished_metric.dec();
calls_unfinished_metric.dec();
match size {
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
self.bytes_finished_counter(file_kind, op_kind).inc_by(size);
}
}
}
}
@@ -896,6 +1031,8 @@ impl Drop for RemoteTimelineClientMetrics {
remote_operation_time,
calls_unfinished_gauge,
calls_started_hist,
bytes_started_counter,
bytes_finished_counter,
} = self;
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
@@ -916,6 +1053,22 @@ impl Drop for RemoteTimelineClientMetrics {
b,
]);
}
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
tenant_id,
timeline_id,
a,
b,
]);
}
for ((a, b), _) in bytes_finished_counter.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER.remove_label_values(&[
tenant_id,
timeline_id,
a,
b,
]);
}
{
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);

View File

@@ -700,6 +700,8 @@ impl PageServerHandler {
full_backup: bool,
ctx: RequestContext,
) -> anyhow::Result<()> {
let started = std::time::Instant::now();
// check that the timeline exists
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
@@ -712,6 +714,8 @@ impl PageServerHandler {
.context("invalid basebackup lsn")?;
}
let lsn_awaited_after = started.elapsed();
// switch client to COPYOUT
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
pgb.flush().await?;
@@ -732,7 +736,17 @@ impl PageServerHandler {
pgb.write_message_noflush(&BeMessage::CopyDone)?;
pgb.flush().await?;
info!("basebackup complete");
let basebackup_after = started
.elapsed()
.checked_sub(lsn_awaited_after)
.unwrap_or(Duration::ZERO);
info!(
lsn_await_millis = lsn_awaited_after.as_millis(),
basebackup_millis = basebackup_after.as_millis(),
"basebackup complete"
);
Ok(())
}

View File

@@ -118,6 +118,10 @@ pub struct Tenant {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
/// The value creation timestamp, used to measure activation delay, see:
/// <https://github.com/neondatabase/neon/issues/4025>
loading_started_at: Instant,
state: watch::Sender<TenantState>,
// Overridden tenant-specific config parameters.
@@ -1476,7 +1480,7 @@ impl Tenant {
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
info!("Activating tenant {}", self.tenant_id);
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
@@ -1487,12 +1491,17 @@ impl Tenant {
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self.tenant_id);
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
for timeline in not_broken_timelines {
match timeline
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {}
Ok(()) => {
activated_timelines += 1;
}
Err(e) => {
error!(
"Failed to activate timeline {}: {:#}",
@@ -1503,9 +1512,26 @@ impl Tenant {
"failed to activate timeline {}: {}",
timeline.timeline_id, e
));
timelines_broken_during_activation += 1;
}
}
}
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
timelines_broken_during_activation,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
}
}
});
@@ -1812,6 +1838,9 @@ impl Tenant {
Tenant {
tenant_id,
conf,
// using now here is good enough approximation to catch tenants with really long
// activation times.
loading_started_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),
gc_cs: tokio::sync::Mutex::new(()),

View File

@@ -48,7 +48,6 @@ mod layer_coverage;
use crate::context::RequestContext;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
@@ -288,7 +287,6 @@ where
self.l0_delta_layers.push(layer);
}
NUM_ONDISK_LAYERS.inc();
Ok(())
}
@@ -314,8 +312,6 @@ where
"failed to locate removed historic layer from l0_delta_layers"
);
}
NUM_ONDISK_LAYERS.dec();
}
pub(self) fn replace_historic_noflush(

View File

@@ -219,7 +219,8 @@ use utils::lsn::Lsn;
use crate::metrics::{
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::{
@@ -367,9 +368,13 @@ impl RemoteTimelineClient {
/// Download index file
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
let _unfinished_gauge_guard = self
.metrics
.call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download);
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Index,
&RemoteOpKind::Download,
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
reason: "no need for a downloads gauge",
},
);
download::download_index_part(
self.conf,
@@ -398,9 +403,13 @@ impl RemoteTimelineClient {
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<u64> {
let downloaded_size = {
let _unfinished_gauge_guard = self
.metrics
.call_begin(&RemoteOpFileKind::Layer, &RemoteOpKind::Download);
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Layer,
&RemoteOpKind::Download,
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
reason: "no need for a downloads gauge",
},
);
download::download_layer_file(
self.conf,
&self.storage_impl,
@@ -886,11 +895,32 @@ impl RemoteTimelineClient {
fn calls_unfinished_metric_impl(
&self,
op: &UploadOp,
) -> Option<(RemoteOpFileKind, RemoteOpKind)> {
) -> Option<(
RemoteOpFileKind,
RemoteOpKind,
RemoteTimelineClientMetricsCallTrackSize,
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
let res = match op {
UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload),
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
UploadOp::UploadLayer(_, m) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
),
UploadOp::UploadMetadata(_, _) => (
RemoteOpFileKind::Index,
RemoteOpKind::Upload,
DontTrackSize {
reason: "metadata uploads are tiny",
},
),
UploadOp::Delete(file_kind, _) => (
*file_kind,
RemoteOpKind::Delete,
DontTrackSize {
reason: "should we track deletes? positive or negative sign?",
},
),
UploadOp::Barrier(_) => {
// we do not account these
return None;
@@ -900,20 +930,20 @@ impl RemoteTimelineClient {
}
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
Some(x) => x,
None => return,
};
let guard = self.metrics.call_begin(&file_kind, &op_kind);
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
}
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
Some(x) => x,
None => return,
};
self.metrics.call_end(&file_kind, &op_kind);
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
}
fn stop(&self) {
@@ -981,11 +1011,19 @@ impl RemoteTimelineClient {
mod tests {
use super::*;
use crate::{
tenant::harness::{TenantHarness, TIMELINE_ID},
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
Tenant,
},
DEFAULT_PG_VERSION,
};
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
use std::{collections::HashSet, path::Path};
use std::{
collections::HashSet,
path::{Path, PathBuf},
};
use tokio::runtime::EnterGuard;
use utils::lsn::Lsn;
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
@@ -1034,39 +1072,80 @@ mod tests {
assert_eq!(found, expected);
}
struct TestSetup {
runtime: &'static tokio::runtime::Runtime,
entered_runtime: EnterGuard<'static>,
harness: TenantHarness<'static>,
tenant: Arc<Tenant>,
tenant_ctx: RequestContext,
remote_fs_dir: PathBuf,
client: Arc<RemoteTimelineClient>,
}
impl TestSetup {
fn new(test_name: &str) -> anyhow::Result<Self> {
// Use a current-thread runtime in the test
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let entered_runtime = runtime.enter();
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = timeline.initialize(&ctx).unwrap();
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let client = Arc::new(RemoteTimelineClient {
conf: harness.conf,
runtime,
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
storage_impl: storage,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&harness.tenant_id,
&TIMELINE_ID,
)),
});
Ok(Self {
runtime,
entered_runtime,
harness,
tenant,
tenant_ctx: ctx,
remote_fs_dir,
client,
})
}
}
// Test scheduling
#[test]
fn upload_scheduling() -> anyhow::Result<()> {
// Use a current-thread runtime in the test
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let _entered = runtime.enter();
let harness = TenantHarness::create("upload_scheduling")?;
let (tenant, ctx) = runtime.block_on(harness.load());
let _timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
// Test outline:
//
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
@@ -1081,21 +1160,19 @@ mod tests {
// Schedule another deletion. Check that it's launched immediately.
// Schedule index upload. Check that it's queued
println!("workdir: {}", harness.conf.workdir.display());
let storage_impl = GenericRemoteStorage::from_config(&storage_config)?;
let client = Arc::new(RemoteTimelineClient {
conf: harness.conf,
let TestSetup {
runtime,
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
storage_impl,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&harness.tenant_id,
&TIMELINE_ID,
)),
});
entered_runtime: _entered_runtime,
harness,
tenant: _tenant,
tenant_ctx: _tenant_ctx,
remote_fs_dir,
client,
} = TestSetup::new("upload_scheduling").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
println!("workdir: {}", harness.conf.workdir.display());
let remote_timeline_dir =
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
@@ -1216,4 +1293,90 @@ mod tests {
Ok(())
}
#[test]
fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> {
// Setup
let TestSetup {
runtime,
harness,
client,
..
} = TestSetup::new("metrics")?;
let metadata = dummy_metadata(Lsn(0x10));
client.init_upload_queue_for_empty_remote(&metadata)?;
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let content_1 = dummy_contents("foo");
std::fs::write(
timeline_path.join(layer_file_name_1.file_name()),
&content_1,
)?;
#[derive(Debug, PartialEq)]
struct BytesStartedFinished {
started: Option<usize>,
finished: Option<usize>,
}
let get_bytes_started_stopped = || {
let started = client
.metrics
.get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
.map(|v| v.try_into().unwrap());
let stopped = client
.metrics
.get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
.map(|v| v.try_into().unwrap());
BytesStartedFinished {
started,
finished: stopped,
}
};
// Test
let init = get_bytes_started_stopped();
client.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64),
)?;
let pre = get_bytes_started_stopped();
runtime.block_on(client.wait_completion())?;
let post = get_bytes_started_stopped();
// Validate
assert_eq!(
init,
BytesStartedFinished {
started: None,
finished: None
}
);
assert_eq!(
pre,
BytesStartedFinished {
started: Some(content_1.len()),
// assert that the _finished metric is created eagerly so that subtractions work on first sample
finished: Some(0),
}
);
assert_eq!(
post,
BytesStartedFinished {
started: Some(content_1.len()),
finished: Some(content_1.len())
}
);
Ok(())
}
}

View File

@@ -48,7 +48,7 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{TimelineMetrics, LOAD_LAYER_MAP_HISTOGRAM};
use crate::metrics::TimelineMetrics;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
@@ -1444,7 +1444,7 @@ impl Timeline {
let mut updates = layers.batch_update();
let mut num_layers = 0;
let timer = LOAD_LAYER_MAP_HISTOGRAM.start_timer();
let timer = self.metrics.load_layer_map_histo.start_timer();
// Scan timeline directory and create ImageFileName and DeltaFilename
// structs representing all files on disk

View File

@@ -32,3 +32,11 @@ CREATE VIEW local_cache AS
SELECT P.* FROM local_cache_pages() AS P
(pageoffs int8, relfilenode oid, reltablespace oid, reldatabase oid,
relforknumber int2, relblocknumber int8, accesscount int4);
create table neon_prepared_statements(
client_id text not null,
stmt_name text not null,
stmt_body text not null,
from_sql boolean not null,
primary key(client_id, stmt_name)
);

View File

@@ -13,12 +13,15 @@
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/prepare.h"
#include "executor/spi.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "catalog/pg_type.h"
#include "replication/walsender.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
@@ -28,12 +31,31 @@
PG_MODULE_MAGIC;
void _PG_init(void);
static char* neon_load_prepared_statement(char const* stmt_name, bool* from_sql);
static void neon_save_prepared_statement(char const* stmt_name, char const* stmt_body, bool from_sql);
static bool neon_drop_prepared_statement(char const* stmt_name);
static bool save_parepared_statememts;
void
_PG_init(void)
{
pg_init_libpagestore();
pg_init_walproposer();
DefineCustomBoolVariable("neon.save_prepared_statements",
"Support prepared statements in case of using connetion pooler",
NULL,
&save_parepared_statememts,
false, /* disabled by default */
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
save_prepared_statement_hook = neon_save_prepared_statement;
load_prepared_statement_hook = neon_load_prepared_statement;
drop_prepared_statement_hook = neon_drop_prepared_statement;
EmitWarningsOnPlaceholders("neon");
}
@@ -85,3 +107,72 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
{
PG_RETURN_UINT64(BackpressureThrottlingTime());
}
static char*
neon_load_prepared_statement(char const* stmt_name, bool* from_sql)
{
char* stmt_body = NULL;
if (save_parepared_statememts)
{
int rc;
Oid param_types[2] = {TEXTOID, TEXTOID};
Datum param_values[2] = {CStringGetTextDatum(application_name), CStringGetTextDatum(stmt_name)};
bool is_null;
MemoryContext call_ctx = CurrentMemoryContext;
SPI_connect();
rc = SPI_execute_with_args("select stmt_body,from_sql from neon_prepared_statements where client_id=$1 and stmt_name=$2",
2, param_types, param_values, NULL, true, 1);
if (rc != SPI_OK_SELECT || SPI_processed != 1) {
SPI_finish();
elog(LOG, "Prepared statement %s not found for client %s", stmt_name, application_name);
return NULL;
}
stmt_body = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
stmt_body = MemoryContextStrdup(call_ctx, stmt_body);
*from_sql = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2, &is_null));
SPI_finish();
}
return stmt_body;
}
static void
neon_save_prepared_statement(char const* stmt_name, char const* stmt_body, bool from_sql)
{
if (save_parepared_statememts)
{
int rc;
Oid param_types[4] = {TEXTOID, TEXTOID, TEXTOID, BOOLOID};
Datum param_values[4] = {CStringGetTextDatum(application_name), CStringGetTextDatum(stmt_name), CStringGetTextDatum(stmt_body), BoolGetDatum(from_sql)};
SPI_connect();
rc = SPI_execute_with_args("insert into neon_prepared_statements values($1,$2,$3,$4) on conflict (client_id,stmt_name) do update set stmt_body=EXCLUDED.stmt_body, from_sql=EXCLUDED.from_sql",
4, param_types, param_values, NULL, false, 1);
if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE)
elog(LOG, "Failed to persist prepared statement %s for client %s", stmt_name, application_name);
SPI_finish();
}
}
static bool
neon_drop_prepared_statement(char const* stmt_name)
{
if (save_parepared_statememts)
{
int rc;
Oid param_types[2] = {TEXTOID, TEXTOID};
Datum param_values[2] = {CStringGetTextDatum(application_name), CStringGetTextDatum(stmt_name)};
SPI_connect();
rc = SPI_execute_with_args("delete from neon_prepared_statements where client_id=$1 and stmt_name=$2",
2, param_types, param_values, NULL, false, 1);
if (rc != SPI_OK_DELETE || SPI_processed != 1) {
SPI_finish();
elog(LOG, "Prepared statement %s not found for client %s", stmt_name, application_name);
return false;
}
SPI_finish();
return true;
}
return false;
}

View File

@@ -45,6 +45,8 @@ PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
"pageserver_remote_physical_size",
"pageserver_remote_timeline_client_bytes_started_total",
"pageserver_remote_timeline_client_bytes_finished_total",
)
PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
@@ -53,6 +55,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_storage_operations_seconds_global_bucket",
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (

View File

@@ -2928,32 +2928,18 @@ def fork_at_current_lsn(
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
def wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: List[Safekeeper],
pageserver: NeonPageserver,
):
sk_commit_lsns = [
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn for sk in safekeepers
]
lsn = max(sk_commit_lsns)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, lsn)
return lsn
def wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: List[Safekeeper],
pageserver: NeonPageserver,
):
lsn = wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
tenant_id, timeline_id, safekeepers, pageserver
)
ps_http = pageserver.http_client()
def last_flush_lsn_upload(
env: NeonEnv, endpoint: Endpoint, tenant_id: TenantId, timeline_id: TimelineId
) -> Lsn:
"""
Wait for pageserver to catch to the latest flush LSN of given endpoint,
checkpoint pageserver, and wait for it to be uploaded (remote_consistent_lsn
reaching flush LSN).
"""
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
ps_http = env.pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn)
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
return lsn
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
return last_flush_lsn

View File

@@ -550,3 +550,13 @@ class PageserverHttpClient(requests.Session):
def tenant_break(self, tenant_id: TenantId):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)
def post_tracing_event(self, level: str, message: str):
res = self.post(
f"http://localhost:{self.port}/v1/tracing/event",
json={
"level": level,
"message": message,
},
)
self.verbose_error(res)

View File

@@ -54,10 +54,9 @@ def wait_for_upload(
if current_lsn >= lsn:
log.info("wait finished")
return
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
log.info(
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
f"waiting for remote_consistent_lsn to reach {lsn}, now {current_lsn}, last_record_lsn={lr_lsn}, iteration {i + 1}"
)
time.sleep(1)
raise Exception(

View File

@@ -1,10 +1,7 @@
import threading
import timeit
from threading import BoundedSemaphore
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import NeonEnvBuilder
# Run bulk tenant creation test.
@@ -53,57 +50,3 @@ def test_bulk_tenant_create(
"s",
report=MetricReport.LOWER_IS_BETTER,
)
@pytest.mark.parametrize("tenants_count", [50])
def test_parallel_tenant_create(
neon_compare: NeonCompare,
tenants_count: int,
):
"""Create lots of tenants in parallel
One important thing that this measures is the amount of prometheus
metrics per tenant. The pageserver exposes a lot of metrics for
each timeline, and this test gives some visibility to how much
exactly. (We've had to raise the prometheus scraper's limit on
the max metrics size several times, because we expose so many.)
"""
env = neon_compare.env
zenbenchmark = neon_compare.zenbenchmark
max_concurrent = 5
pool_sema = BoundedSemaphore(value=max_concurrent)
def worker(i: int):
with pool_sema:
tenant, timeline_id = env.neon_cli.create_tenant()
endpoint_tenant = env.endpoints.create_start("main", tenant_id=tenant)
with endpoint_tenant.cursor() as cur:
cur.execute("select count(*) from pg_class")
endpoint_tenant.stop()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(tenants_count)]
start = timeit.default_timer()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
end = timeit.default_timer()
zenbenchmark.record(
"tenant_creation_time",
end - start,
"s",
report=MetricReport.LOWER_IS_BETTER,
)
metrics = env.pageserver.http_client().get_metrics_str()
zenbenchmark.record(
"prometheus_metrics_size",
len(metrics) / tenants_count,
"bytes",
report=MetricReport.LOWER_IS_BETTER,
)

View File

@@ -16,8 +16,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
from fixtures.types import Lsn
from pytest import FixtureRequest
#
@@ -59,10 +58,6 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
@@ -74,6 +69,10 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])

View File

@@ -6,7 +6,6 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
wait_for_last_flush_lsn,
wait_for_sk_commit_lsn_to_reach_remote_storage,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
@@ -199,7 +198,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
# with image_creation_threshold=1 which we will use on the last compaction
cur.execute("vacuum")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
last_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
if i == 1 and j == 2 and k == 1:
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
@@ -222,10 +221,8 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
tenant_update_config({"image_creation_threshold": "1"})
ps_http.timeline_compact(tenant_id, timeline_id)
# wait for all uploads to finish
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
# wait for all uploads to finish (checkpoint has been done above)
wait_for_upload(ps_http, tenant_id, timeline_id, last_lsn)
# shutdown safekeepers to avoid on-demand downloads from walreceiver
for sk in env.safekeepers:

View File

@@ -0,0 +1,49 @@
import uuid
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import wait_until
@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "error"])
def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str):
# self-test: make sure the event is logged (i.e., our testing endpoint works)
log_expected = {
"trace": False,
"debug": False,
"info": True,
"warn": True,
"error": True,
}[level]
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
msg_id = uuid.uuid4().hex
# NB: the _total suffix is added by our prometheus client
before = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
# post the event
ps_http.post_tracing_event(level, msg_id)
if log_expected:
env.pageserver.allowed_errors.append(f".*{msg_id}.*")
def assert_logged():
if not log_expected:
return
assert env.pageserver.log_contains(f".*{msg_id}.*")
wait_until(10, 0.5, assert_logged)
# make sure it's counted
def assert_metric_value():
if not log_expected:
return
# NB: the _total suffix is added by our prometheus client
val = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
val = val or 0.0
log.info("libmetrics_tracing_event_count: %s", val)
assert val > (before or 0.0)
wait_until(10, 1, assert_metric_value)

View File

@@ -51,7 +51,7 @@ metric_kinds_checked = set([])
#
# verify that metrics look minimally sane
# verify that metrics look minilally sane
#
def metrics_handler(request: Request) -> Response:
if request.json is None:

View File

@@ -12,8 +12,8 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
wait_for_sk_commit_lsn_to_reach_remote_storage,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
@@ -207,9 +207,7 @@ def test_ondemand_download_timetravel(
env.endpoints.stop_all()
# wait until pageserver has successfully uploaded all the data to remote storage
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
def get_api_current_physical_size():
d = client.timeline_detail(tenant_id, timeline_id)
@@ -347,12 +345,9 @@ def test_download_remote_layers_api(
"""
)
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
env.endpoints.stop_all()
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
def get_api_current_physical_size():
d = client.timeline_detail(tenant_id, timeline_id)
return d["current_physical_size"]

View File

@@ -1,3 +1,4 @@
import json
from contextlib import closing
import psycopg2.extras
@@ -22,9 +23,12 @@ wait_lsn_timeout='111 s';
checkpoint_distance = 10000
compaction_target_size = 1048576
evictions_low_residence_duration_metric_threshold = "2 days"
eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = "23 hours" }
"""
env = neon_env_builder.init_start()
# we configure eviction but no remote storage, there might be error lines
env.pageserver.allowed_errors.append(".* no remote storage configured, cannot evict layers .*")
http_client = env.pageserver.http_client()
# Check that we raise on misspelled configs
@@ -44,6 +48,7 @@ evictions_low_residence_duration_metric_threshold = "2 days"
"checkpoint_distance": "20000",
"gc_period": "30sec",
"evictions_low_residence_duration_metric_threshold": "42s",
"eviction_policy": json.dumps({"kind": "NoEviction"}),
}
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
@@ -84,6 +89,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
assert effective_config["image_creation_threshold"] == 3
assert effective_config["pitr_interval"] == "7days"
assert effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
assert effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
}
# check the configuration of the new tenant
with closing(env.pageserver.connect()) as psconn:
@@ -121,6 +131,9 @@ evictions_low_residence_duration_metric_threshold = "2 days"
assert (
new_effective_config["evictions_low_residence_duration_metric_threshold"] == "42s"
), "Should override default value"
assert new_effective_config["eviction_policy"] == {
"kind": "NoEviction"
}, "Specific 'eviction_policy' config should override the default value"
assert new_effective_config["compaction_target_size"] == 1048576
assert new_effective_config["compaction_period"] == "20s"
assert new_effective_config["compaction_threshold"] == 10
@@ -135,6 +148,9 @@ evictions_low_residence_duration_metric_threshold = "2 days"
"compaction_period": "80sec",
"image_creation_threshold": "2",
"evictions_low_residence_duration_metric_threshold": "23h",
"eviction_policy": json.dumps(
{"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"}
),
}
env.neon_cli.config_tenant(
tenant_id=tenant,
@@ -180,6 +196,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
assert (
updated_effective_config["evictions_low_residence_duration_metric_threshold"] == "23h"
), "Should override default value"
assert updated_effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "1m 20s",
"threshold": "1day 18h",
}, "Specific 'eviction_policy' config should override the default value"
assert updated_effective_config["compaction_target_size"] == 1048576
assert updated_effective_config["compaction_threshold"] == 10
assert updated_effective_config["gc_horizon"] == 67108864
@@ -239,6 +260,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
assert final_effective_config["gc_period"] == "1h"
assert final_effective_config["image_creation_threshold"] == 3
assert final_effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
assert final_effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
}
# restart the pageserver and ensure that the config is still correct
env.pageserver.stop()

View File

@@ -21,7 +21,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
wait_for_sk_commit_lsn_to_reach_remote_storage,
last_flush_lsn_upload,
)
from fixtures.pageserver.utils import (
assert_tenant_state,
@@ -174,12 +174,9 @@ def test_tenants_attached_after_download(
)
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
env.endpoints.stop_all()
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)