mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-26 14:50:36 +00:00
Compare commits
31 Commits
sk/sql_ove
...
handle_dup
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b35f3307f | ||
|
|
5f39328a06 | ||
|
|
fa7e975787 | ||
|
|
9b095034bc | ||
|
|
31a3910fd9 | ||
|
|
381c8fca4f | ||
|
|
4625da3164 | ||
|
|
850f6b1cb9 | ||
|
|
f19b70b379 | ||
|
|
9d0cf08d5f | ||
|
|
2d6fd72177 | ||
|
|
8945fbdb31 | ||
|
|
05ac0e2493 | ||
|
|
bfd45dd671 | ||
|
|
7f80230fd2 | ||
|
|
78bbbccadb | ||
|
|
dbbe032c39 | ||
|
|
cb9473928d | ||
|
|
fa20e37574 | ||
|
|
4911d7ce6f | ||
|
|
e83684b868 | ||
|
|
afbbc61036 | ||
|
|
7ba5c286b7 | ||
|
|
02b28ae0b1 | ||
|
|
0bfbae2d73 | ||
|
|
f1b7dc4064 | ||
|
|
e2a5177e89 | ||
|
|
0c083564ce | ||
|
|
d8dd60dc81 | ||
|
|
73f34eaa5e | ||
|
|
c2496c7ef2 |
50
.github/ansible/prod.us-east-1.hosts.yaml
vendored
Normal file
50
.github/ansible/prod.us-east-1.hosts.yaml
vendored
Normal file
@@ -0,0 +1,50 @@
|
||||
storage:
|
||||
vars:
|
||||
bucket_name: neon-prod-storage-us-east-1
|
||||
bucket_region: us-east-1
|
||||
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
|
||||
broker_endpoint: http://storage-broker-lb.theta.us-east-1.internal.aws.neon.tech:50051
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
|
||||
metric_collection_interval: 10min
|
||||
disk_usage_based_eviction:
|
||||
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
|
||||
min_avail_bytes: 0
|
||||
period: "10s"
|
||||
tenant_config:
|
||||
eviction_policy:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
prefix_in_bucket: "pageserver/v1"
|
||||
safekeeper_s3_prefix: safekeeper/v1/wal
|
||||
hostname_suffix: ""
|
||||
remote_user: ssm-user
|
||||
ansible_aws_ssm_region: us-east-1
|
||||
ansible_aws_ssm_bucket_name: neon-prod-storage-us-east-1
|
||||
console_region_id: aws-us-east-1
|
||||
sentry_environment: production
|
||||
|
||||
children:
|
||||
pageservers:
|
||||
hosts:
|
||||
pageserver-0.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-085222088b0d2e0c7
|
||||
pageserver-1.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-0969d4f684d23a21e
|
||||
pageserver-2.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-05dee87895da58dad
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
safekeeper-0.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-04ce739e88793d864
|
||||
safekeeper-1.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-0e9e6c9227fb81410
|
||||
safekeeper-2.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-072f4dd86a327d52f
|
||||
47
.github/ansible/staging.eu-central-1.hosts.yaml
vendored
Normal file
47
.github/ansible/staging.eu-central-1.hosts.yaml
vendored
Normal file
@@ -0,0 +1,47 @@
|
||||
storage:
|
||||
vars:
|
||||
bucket_name: neon-dev-storage-eu-central-1
|
||||
bucket_region: eu-central-1
|
||||
# We only register/update storage in one preview console and manually copy to other instances
|
||||
console_mgmt_base_url: http://neon-internal-api.helium.aws.neon.build
|
||||
broker_endpoint: http://storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build:50051
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
metric_collection_endpoint: http://neon-internal-api.helium.aws.neon.build/billing/api/v1/usage_events
|
||||
metric_collection_interval: 10min
|
||||
disk_usage_based_eviction:
|
||||
max_usage_pct: 80
|
||||
min_avail_bytes: 0
|
||||
period: "10s"
|
||||
tenant_config:
|
||||
eviction_policy:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "20m"
|
||||
threshold: &default_eviction_threshold "20m"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
prefix_in_bucket: "pageserver/v1"
|
||||
safekeeper_s3_prefix: safekeeper/v1/wal
|
||||
hostname_suffix: ""
|
||||
remote_user: ssm-user
|
||||
ansible_aws_ssm_region: eu-central-1
|
||||
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-central-1
|
||||
console_region_id: aws-eu-central-1
|
||||
sentry_environment: staging
|
||||
|
||||
children:
|
||||
pageservers:
|
||||
hosts:
|
||||
pageserver-0.eu-central-1.aws.neon.build:
|
||||
ansible_host: i-011f93ec26cfba2d4
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
safekeeper-0.eu-central-1.aws.neon.build:
|
||||
ansible_host: i-0ff026d27babf8ddd
|
||||
safekeeper-1.eu-central-1.aws.neon.build:
|
||||
ansible_host: i-03983a49ee54725d9
|
||||
safekeeper-2.eu-central-1.aws.neon.build:
|
||||
ansible_host: i-0bd025ecdb61b0db3
|
||||
2
.github/ansible/staging.us-east-2.hosts.yaml
vendored
2
.github/ansible/staging.us-east-2.hosts.yaml
vendored
@@ -48,8 +48,6 @@ storage:
|
||||
hosts:
|
||||
safekeeper-0.us-east-2.aws.neon.build:
|
||||
ansible_host: i-027662bd552bf5db0
|
||||
safekeeper-1.us-east-2.aws.neon.build:
|
||||
ansible_host: i-0171efc3604a7b907
|
||||
safekeeper-2.us-east-2.aws.neon.build:
|
||||
ansible_host: i-0de0b03a51676a6ce
|
||||
safekeeper-99.us-east-2.aws.neon.build:
|
||||
|
||||
52
.github/helm-values/dev-eu-central-1-alpha.neon-storage-broker.yaml
vendored
Normal file
52
.github/helm-values/dev-eu-central-1-alpha.neon-storage-broker.yaml
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
# Helm chart values for neon-storage-broker
|
||||
podLabels:
|
||||
neon_env: staging
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-storage-broker.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-storage-broker
|
||||
app.kubernetes.io/instance: neon-storage-broker
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-storage-broker"
|
||||
endpoints:
|
||||
- port: broker
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
|
||||
settings:
|
||||
sentryEnvironment: "staging"
|
||||
@@ -23,6 +23,7 @@ settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
|
||||
domain: "*.eu-west-1.aws.neon.build"
|
||||
otelExporterOtlpEndpoint: "https://otel-collector.zeta.eu-west-1.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
|
||||
|
||||
@@ -9,6 +9,7 @@ settings:
|
||||
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
|
||||
uri: "https://console.stage.neon.tech/psql_session/"
|
||||
domain: "pg.neon.build"
|
||||
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "1min"
|
||||
|
||||
@@ -24,6 +24,7 @@ settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
|
||||
domain: "*.cloud.stage.neon.tech"
|
||||
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
|
||||
|
||||
@@ -25,6 +25,7 @@ settings:
|
||||
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
|
||||
domain: "*.us-east-2.aws.neon.build"
|
||||
extraDomains: ["*.us-east-2.postgres.zenith.tech", "*.us-east-2.retooldb-staging.com"]
|
||||
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
|
||||
|
||||
67
.github/helm-values/preview-template.neon-proxy-scram.yaml
vendored
Normal file
67
.github/helm-values/preview-template.neon-proxy-scram.yaml
vendored
Normal file
@@ -0,0 +1,67 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/management/api/v2"
|
||||
domain: "*.cloud.${PREVIEW_NAME}.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "1min"
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
neon_service: proxy-scram
|
||||
neon_env: test
|
||||
neon_region: ${PREVIEW_NAME}.eu-central-1
|
||||
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: cloud.${PREVIEW_NAME}.aws.neon.build
|
||||
httpsPort: 443
|
||||
|
||||
#metrics:
|
||||
# enabled: true
|
||||
# serviceMonitor:
|
||||
# enabled: true
|
||||
# selector:
|
||||
# release: kube-prometheus-stack
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-proxy
|
||||
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-proxy"
|
||||
endpoints:
|
||||
- port: http
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
69
.github/helm-values/prod-us-east-1-theta.neon-proxy-scram.yaml
vendored
Normal file
69
.github/helm-values/prod-us-east-1-theta.neon-proxy-scram.yaml
vendored
Normal file
@@ -0,0 +1,69 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
|
||||
domain: "*.us-east-1.aws.neon.tech"
|
||||
# *.us-east-1.retooldb.com hasn't been delegated yet.
|
||||
extraDomains: ["*.us-east-1.postgres.vercel-storage.com"]
|
||||
sentryEnvironment: "production"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "10min"
|
||||
|
||||
podLabels:
|
||||
neon_service: proxy-scram
|
||||
neon_env: prod
|
||||
neon_region: us-east-1
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: us-east-1.aws.neon.tech
|
||||
httpsPort: 443
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-proxy
|
||||
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-proxy"
|
||||
endpoints:
|
||||
- port: http
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
52
.github/helm-values/prod-us-east-1-theta.neon-storage-broker.yaml
vendored
Normal file
52
.github/helm-values/prod-us-east-1-theta.neon-storage-broker.yaml
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
# Helm chart values for neon-storage-broker
|
||||
podLabels:
|
||||
neon_env: production
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.theta.us-east-1.internal.aws.neon.tech
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-storage-broker.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-storage-broker
|
||||
app.kubernetes.io/instance: neon-storage-broker
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-storage-broker"
|
||||
endpoints:
|
||||
- port: broker
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
|
||||
settings:
|
||||
sentryEnvironment: "production"
|
||||
182
.github/workflows/build_and_test.yml
vendored
182
.github/workflows/build_and_test.yml
vendored
@@ -541,7 +541,7 @@ jobs:
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||
options: --init
|
||||
needs: [ push-docker-hub, tag ]
|
||||
needs: [ promote-images, tag ]
|
||||
steps:
|
||||
- name: Set PR's status to pending and request a remote CI test
|
||||
run: |
|
||||
@@ -584,8 +584,7 @@ jobs:
|
||||
neon-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
needs: [ tag ]
|
||||
# https://github.com/GoogleContainerTools/kaniko/issues/2005
|
||||
container: gcr.io/kaniko-project/executor:v1.7.0-debug
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
defaults:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
@@ -597,11 +596,32 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure ECR login
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- name: Kaniko build neon
|
||||
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
@@ -652,7 +672,7 @@ jobs:
|
||||
compute-tools-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
needs: [ tag ]
|
||||
container: gcr.io/kaniko-project/executor:v1.7.0-debug
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
defaults:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
@@ -661,18 +681,41 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1 # v3 won't work with kaniko
|
||||
|
||||
- name: Configure ECR login
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- name: Kaniko build compute tools
|
||||
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--dockerfile Dockerfile.compute-tools
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
compute-node-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container: gcr.io/kaniko-project/executor:v1.7.0-debug
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
needs: [ tag ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
@@ -689,12 +732,36 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure ECR login
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- name: Kaniko build compute node with extensions
|
||||
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --build-arg PG_VERSION=${{ matrix.version }} --dockerfile Dockerfile.compute-node --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--build-arg PG_VERSION=${{ matrix.version }}
|
||||
--dockerfile Dockerfile.compute-node
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
@@ -786,13 +853,11 @@ jobs:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
needs: [ tag, test-images, vm-compute-node-image ]
|
||||
container: golang:1.19-bullseye
|
||||
if: github.event_name != 'workflow_dispatch'
|
||||
# Don't add if-condition here.
|
||||
# The job should always be run because we have dependant other jobs that shouldn't be skipped
|
||||
|
||||
steps:
|
||||
- name: Install Crane & ECR helper
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
|
||||
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
|
||||
@@ -802,10 +867,15 @@ jobs:
|
||||
mkdir /github/home/.docker/
|
||||
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
|
||||
|
||||
- name: Copy vm-compute-node images to Docker Hub
|
||||
run: |
|
||||
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
|
||||
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
|
||||
|
||||
- name: Add latest tag to images
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
@@ -814,50 +884,10 @@ jobs:
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
push-docker-hub:
|
||||
runs-on: [ self-hosted, dev, x64 ]
|
||||
needs: [ promote-images, tag ]
|
||||
container: golang:1.19-bullseye
|
||||
|
||||
steps:
|
||||
- name: Install Crane & ECR helper
|
||||
run: |
|
||||
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
|
||||
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
|
||||
|
||||
- name: Configure ECR login
|
||||
run: |
|
||||
mkdir /github/home/.docker/
|
||||
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
|
||||
|
||||
- name: Pull neon image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} neon
|
||||
|
||||
- name: Pull compute tools image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} compute-tools
|
||||
|
||||
- name: Pull compute node v14 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} compute-node-v14
|
||||
|
||||
- name: Pull vm compute node v14 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
|
||||
|
||||
- name: Pull compute node v15 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} compute-node-v15
|
||||
|
||||
- name: Pull vm compute node v15 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
|
||||
|
||||
- name: Pull rust image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
|
||||
|
||||
- name: Push images to production ECR
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/neon:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
|
||||
@@ -872,28 +902,12 @@ jobs:
|
||||
echo "" > /github/home/.docker/config.json
|
||||
crane auth login -u ${{ secrets.NEON_DOCKERHUB_USERNAME }} -p ${{ secrets.NEON_DOCKERHUB_PASSWORD }} index.docker.io
|
||||
|
||||
- name: Push neon image to Docker Hub
|
||||
run: crane push neon neondatabase/neon:${{needs.tag.outputs.build-tag}}
|
||||
- name: Push vm-compute-node to Docker Hub
|
||||
run: |
|
||||
crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
|
||||
crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push compute tools image to Docker Hub
|
||||
run: crane push compute-tools neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push compute node v14 image to Docker Hub
|
||||
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push vm compute node v14 image to Docker Hub
|
||||
run: crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push compute node v15 image to Docker Hub
|
||||
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push vm compute node v15 image to Docker Hub
|
||||
run: crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push rust image to Docker Hub
|
||||
run: crane push rust neondatabase/rust:pinned
|
||||
|
||||
- name: Add latest tag to images in Docker Hub
|
||||
- name: Push latest tags to Docker Hub
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
@@ -913,7 +927,7 @@ jobs:
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
|
||||
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
|
||||
needs: [ push-docker-hub, tag, regress-tests ]
|
||||
needs: [ promote-images, tag, regress-tests ]
|
||||
if: |
|
||||
contains(github.event.pull_request.labels.*.name, 'deploy-test-storage') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
@@ -947,7 +961,7 @@ jobs:
|
||||
deploy:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
needs: [ push-docker-hub, tag, regress-tests ]
|
||||
needs: [ promote-images, tag, regress-tests ]
|
||||
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
|
||||
steps:
|
||||
- name: Fix git ownership
|
||||
@@ -984,7 +998,7 @@ jobs:
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
needs: [ push-docker-hub, tag, regress-tests ]
|
||||
needs: [ promote-images, tag, regress-tests ]
|
||||
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
|
||||
steps:
|
||||
- name: Promote compatibility snapshot for the release
|
||||
|
||||
52
.github/workflows/deploy-dev.yml
vendored
52
.github/workflows/deploy-dev.yml
vendored
@@ -48,7 +48,8 @@ jobs:
|
||||
shell: bash
|
||||
strategy:
|
||||
matrix:
|
||||
target_region: [ eu-west-1, us-east-2 ]
|
||||
# TODO(sergey): Fix storage deploy in eu-central-1
|
||||
target_region: [ eu-west-1, us-east-2]
|
||||
environment:
|
||||
name: dev-${{ matrix.target_region }}
|
||||
steps:
|
||||
@@ -133,6 +134,53 @@ jobs:
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-preview-proxy-new:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
if: inputs.deployProxy
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- target_region: eu-central-1
|
||||
target_cluster: dev-eu-central-1-alpha
|
||||
environment:
|
||||
name: dev-${{ matrix.target_region }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
ref: ${{ inputs.branch }}
|
||||
|
||||
- name: Configure AWS Credentials
|
||||
uses: aws-actions/configure-aws-credentials@v1-node16
|
||||
with:
|
||||
role-to-assume: arn:aws:iam::369495373322:role/github-runner
|
||||
aws-region: eu-central-1
|
||||
role-skip-session-tagging: true
|
||||
role-duration-seconds: 1800
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
|
||||
|
||||
- name: Re-deploy preview proxies
|
||||
run: |
|
||||
DOCKER_TAG=${{ inputs.dockerTag }}
|
||||
for PREVIEW_NAME in helium argon krypton xenon radon oganesson hydrogen nitrogen oxygen fluorine chlorine; do
|
||||
export PREVIEW_NAME
|
||||
envsubst <.github/helm-values/preview-template.neon-proxy-scram.yaml >preview-${PREVIEW_NAME}.neon-proxy-scram.yaml
|
||||
helm upgrade neon-proxy-scram-${PREVIEW_NAME} neondatabase/neon-proxy --namespace neon-proxy-${PREVIEW_NAME} --create-namespace --install --atomic -f preview-${PREVIEW_NAME}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
|
||||
done
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-storage-broker-new:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
@@ -148,6 +196,8 @@ jobs:
|
||||
target_cluster: dev-us-east-2-beta
|
||||
- target_region: eu-west-1
|
||||
target_cluster: dev-eu-west-1-zeta
|
||||
- target_region: eu-central-1
|
||||
target_cluster: dev-eu-central-1-alpha
|
||||
environment:
|
||||
name: dev-${{ matrix.target_region }}
|
||||
steps:
|
||||
|
||||
8
.github/workflows/deploy-prod.yml
vendored
8
.github/workflows/deploy-prod.yml
vendored
@@ -49,7 +49,7 @@ jobs:
|
||||
shell: bash
|
||||
strategy:
|
||||
matrix:
|
||||
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1 ]
|
||||
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1, us-east-1 ]
|
||||
environment:
|
||||
name: prod-${{ matrix.target_region }}
|
||||
steps:
|
||||
@@ -97,6 +97,10 @@ jobs:
|
||||
target_cluster: prod-ap-southeast-1-epsilon
|
||||
deploy_link_proxy: false
|
||||
deploy_legacy_scram_proxy: false
|
||||
- target_region: us-east-1
|
||||
target_cluster: prod-us-east-1-theta
|
||||
deploy_link_proxy: false
|
||||
deploy_legacy_scram_proxy: false
|
||||
environment:
|
||||
name: prod-${{ matrix.target_region }}
|
||||
steps:
|
||||
@@ -147,6 +151,8 @@ jobs:
|
||||
target_cluster: prod-eu-central-1-gamma
|
||||
- target_region: ap-southeast-1
|
||||
target_cluster: prod-ap-southeast-1-epsilon
|
||||
- target_region: us-east-1
|
||||
target_cluster: prod-us-east-1-theta
|
||||
environment:
|
||||
name: prod-${{ matrix.target_region }}
|
||||
steps:
|
||||
|
||||
37
Cargo.lock
generated
37
Cargo.lock
generated
@@ -297,7 +297,7 @@ dependencies = [
|
||||
"http",
|
||||
"http-body",
|
||||
"lazy_static",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"tracing",
|
||||
]
|
||||
@@ -401,7 +401,7 @@ dependencies = [
|
||||
"hex",
|
||||
"http",
|
||||
"once_cell",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"regex",
|
||||
"ring",
|
||||
"time",
|
||||
@@ -522,7 +522,7 @@ dependencies = [
|
||||
"http-body",
|
||||
"hyper",
|
||||
"once_cell",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"tokio",
|
||||
@@ -544,7 +544,7 @@ dependencies = [
|
||||
"http-body",
|
||||
"hyper",
|
||||
"once_cell",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"tracing",
|
||||
@@ -684,7 +684,7 @@ dependencies = [
|
||||
"matchit",
|
||||
"memchr",
|
||||
"mime",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"serde",
|
||||
@@ -1580,7 +1580,7 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
|
||||
dependencies = [
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1756,9 +1756,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.17"
|
||||
version = "0.3.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f"
|
||||
checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@@ -2584,7 +2584,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"once_cell",
|
||||
"opentelemetry_api",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"rand",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
@@ -2749,12 +2749,6 @@ dependencies = [
|
||||
"base64 0.13.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.2.0"
|
||||
@@ -3118,7 +3112,6 @@ dependencies = [
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"parking_lot",
|
||||
"percent-encoding 1.0.1",
|
||||
"pin-project-lite",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
@@ -3323,7 +3316,7 @@ dependencies = [
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"once_cell",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.20.8",
|
||||
"rustls-pemfile",
|
||||
@@ -3396,7 +3389,7 @@ dependencies = [
|
||||
"http",
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"regex",
|
||||
]
|
||||
|
||||
@@ -4339,7 +4332,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"parking_lot",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
@@ -4487,7 +4480,7 @@ dependencies = [
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"prost-derive",
|
||||
@@ -4519,7 +4512,7 @@ dependencies = [
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"rustls-native-certs",
|
||||
@@ -4829,7 +4822,7 @@ checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
|
||||
dependencies = [
|
||||
"form_urlencoded",
|
||||
"idna",
|
||||
"percent-encoding 2.2.0",
|
||||
"percent-encoding",
|
||||
"serde",
|
||||
]
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ hmac = "0.12.1"
|
||||
hostname = "0.3.1"
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1.1"
|
||||
hyper = { version = "0.14", features = ["http2", "tcp", "runtime", "http1"]}
|
||||
hyper = "0.14"
|
||||
hyper-tungstenite = "0.9"
|
||||
itertools = "0.10"
|
||||
jsonwebtoken = "8"
|
||||
@@ -117,7 +117,6 @@ uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
webpki-roots = "0.23"
|
||||
x509-parser = "0.15"
|
||||
percent-encoding = "1.0"
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
|
||||
@@ -54,7 +54,7 @@ RUN set -e \
|
||||
|
||||
RUN set -e \
|
||||
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
|
||||
&& CONNSTR="dbname=neondb user=cloud_admin sslmode=disable" \
|
||||
&& CONNSTR="dbname=postgres user=cloud_admin sslmode=disable" \
|
||||
&& ARGS="--auto-restart --cgroup=neon-postgres --pgconnstr=\"$CONNSTR\"" \
|
||||
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant $ARGS'" >> /etc/inittab
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ fn main() -> Result<()> {
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
let mut spec = None;
|
||||
let spec;
|
||||
let mut live_config_allowed = false;
|
||||
match spec_json {
|
||||
// First, try to get cluster spec from the cli argument
|
||||
@@ -89,9 +89,13 @@ fn main() -> Result<()> {
|
||||
} else if let Some(id) = compute_id {
|
||||
if let Some(cp_base) = control_plane_uri {
|
||||
live_config_allowed = true;
|
||||
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
|
||||
spec = Some(s);
|
||||
}
|
||||
spec = match get_spec_from_control_plane(cp_base, id) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!("cannot get response from control plane: {}", e);
|
||||
panic!("neither spec nor confirmation that compute is in the Empty state was received");
|
||||
}
|
||||
};
|
||||
} else {
|
||||
panic!("must specify both --control-plane-uri and --compute-id or none");
|
||||
}
|
||||
@@ -114,7 +118,6 @@ fn main() -> Result<()> {
|
||||
spec_set = false;
|
||||
}
|
||||
let compute_node = ComputeNode {
|
||||
start_time: Utc::now(),
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
@@ -147,6 +150,17 @@ fn main() -> Result<()> {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
let pspec = state.pspec.as_ref().expect("spec must be set");
|
||||
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
|
||||
|
||||
// Record for how long we slept waiting for the spec.
|
||||
state.metrics.wait_for_spec_ms = Utc::now()
|
||||
.signed_duration_since(state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
// Reset start time to the actual start of the configuration, so that
|
||||
// total startup time was properly measured at the end.
|
||||
state.start_time = Utc::now();
|
||||
|
||||
state.status = ComputeStatus::Init;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
|
||||
@@ -38,7 +38,6 @@ use crate::spec::*;
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
pub start_time: DateTime<Utc>,
|
||||
// Url type maintains proper escaping
|
||||
pub connstr: url::Url,
|
||||
pub pgdata: String,
|
||||
@@ -66,6 +65,7 @@ pub struct ComputeNode {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ComputeState {
|
||||
pub start_time: DateTime<Utc>,
|
||||
pub status: ComputeStatus,
|
||||
/// Timestamp of the last Postgres activity
|
||||
pub last_active: DateTime<Utc>,
|
||||
@@ -77,6 +77,7 @@ pub struct ComputeState {
|
||||
impl ComputeState {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
start_time: Utc::now(),
|
||||
status: ComputeStatus::Empty,
|
||||
last_active: Utc::now(),
|
||||
error: None,
|
||||
@@ -425,7 +426,7 @@ impl ComputeNode {
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
state.metrics.total_startup_ms = startup_end_time
|
||||
.signed_duration_since(self.start_time)
|
||||
.signed_duration_since(compute_state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
@@ -18,6 +18,7 @@ use tracing_utils::http::OtelName;
|
||||
|
||||
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
|
||||
ComputeStatusResponse {
|
||||
start_time: state.start_time,
|
||||
tenant: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
|
||||
@@ -152,11 +152,14 @@ components:
|
||||
type: object
|
||||
description: Compute startup metrics.
|
||||
required:
|
||||
- wait_for_spec_ms
|
||||
- sync_safekeepers_ms
|
||||
- basebackup_ms
|
||||
- config_ms
|
||||
- total_startup_ms
|
||||
properties:
|
||||
wait_for_spec_ms:
|
||||
type: integer
|
||||
sync_safekeepers_ms:
|
||||
type: integer
|
||||
basebackup_ms:
|
||||
@@ -181,6 +184,13 @@ components:
|
||||
- status
|
||||
- last_active
|
||||
properties:
|
||||
start_time:
|
||||
type: string
|
||||
description: |
|
||||
Time when compute was started. If initially compute was started in the `empty`
|
||||
state and then provided with valid spec, `start_time` will be reset to the
|
||||
moment, when spec was received.
|
||||
example: "2022-10-12T07:20:50.52Z"
|
||||
status:
|
||||
$ref: '#/components/schemas/ComputeStatus'
|
||||
last_active:
|
||||
|
||||
@@ -4,42 +4,117 @@ use std::str::FromStr;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use postgres::config::Config;
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
|
||||
use reqwest::StatusCode;
|
||||
use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
|
||||
|
||||
use crate::config;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
use crate::pg_helpers::*;
|
||||
|
||||
use compute_api::responses::ControlPlaneSpecResponse;
|
||||
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
|
||||
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
|
||||
|
||||
// Do control plane request and return response if any. In case of error it
|
||||
// returns a bool flag indicating whether it makes sense to retry the request
|
||||
// and a string with error message.
|
||||
fn do_control_plane_request(
|
||||
uri: &str,
|
||||
jwt: &str,
|
||||
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
|
||||
let resp = reqwest::blocking::Client::new()
|
||||
.get(uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()
|
||||
.map_err(|e| {
|
||||
(
|
||||
true,
|
||||
format!("could not perform spec request to control plane: {}", e),
|
||||
)
|
||||
})?;
|
||||
|
||||
match resp.status() {
|
||||
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
|
||||
Ok(spec_resp) => Ok(spec_resp),
|
||||
Err(e) => Err((
|
||||
true,
|
||||
format!("could not deserialize control plane response: {}", e),
|
||||
)),
|
||||
},
|
||||
StatusCode::SERVICE_UNAVAILABLE => {
|
||||
Err((true, "control plane is temporarily unavailable".to_string()))
|
||||
}
|
||||
StatusCode::BAD_GATEWAY => {
|
||||
// We have a problem with intermittent 502 errors now
|
||||
// https://github.com/neondatabase/cloud/issues/2353
|
||||
// It's fine to retry GET request in this case.
|
||||
Err((true, "control plane request failed with 502".to_string()))
|
||||
}
|
||||
// Another code, likely 500 or 404, means that compute is unknown to the control plane
|
||||
// or some internal failure happened. Doesn't make much sense to retry in this case.
|
||||
_ => Err((
|
||||
false,
|
||||
format!(
|
||||
"unexpected control plane response status code: {}",
|
||||
resp.status()
|
||||
),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
|
||||
/// env variable is set, it will be used for authorization.
|
||||
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
|
||||
pub fn get_spec_from_control_plane(
|
||||
base_uri: &str,
|
||||
compute_id: &str,
|
||||
) -> Result<Option<ComputeSpec>> {
|
||||
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
|
||||
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
|
||||
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
let mut attempt = 1;
|
||||
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
|
||||
|
||||
info!("getting spec from control plane: {}", cp_uri);
|
||||
|
||||
// TODO: check the response. We should distinguish cases when it's
|
||||
// - network error, then retry
|
||||
// - no spec for compute yet, then wait
|
||||
// - compute id is unknown or any other error, then bail out
|
||||
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
|
||||
.get(cp_uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()
|
||||
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
|
||||
.json()
|
||||
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
|
||||
// Do 3 attempts to get spec from the control plane using the following logic:
|
||||
// - network error -> then retry
|
||||
// - compute id is unknown or any other error -> bail out
|
||||
// - no spec for compute yet (Empty state) -> return Ok(None)
|
||||
// - got spec -> return Ok(Some(spec))
|
||||
while attempt < 4 {
|
||||
spec = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
Ok(spec_resp) => match spec_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok(None),
|
||||
ControlPlaneComputeStatus::Attached => {
|
||||
if let Some(spec) = spec_resp.spec {
|
||||
Ok(Some(spec))
|
||||
} else {
|
||||
bail!("compute is attached, but spec is empty")
|
||||
}
|
||||
}
|
||||
},
|
||||
Err((retry, msg)) => {
|
||||
if retry {
|
||||
Err(anyhow!(msg))
|
||||
} else {
|
||||
bail!(msg);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(spec) = resp.spec {
|
||||
Ok(spec)
|
||||
} else {
|
||||
bail!("could not get compute spec from control plane")
|
||||
if let Err(e) = &spec {
|
||||
error!("attempt {} to get spec failed with: {}", attempt, e);
|
||||
} else {
|
||||
return spec;
|
||||
}
|
||||
|
||||
attempt += 1;
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
|
||||
// All attempts failed, return error.
|
||||
spec
|
||||
}
|
||||
|
||||
/// It takes cluster specification and does the following:
|
||||
|
||||
@@ -359,8 +359,8 @@ impl PageServerNode {
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.get("eviction_policy")
|
||||
.map(|x| serde_json::from_str(x))
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
|
||||
@@ -14,6 +14,7 @@ pub struct GenericAPIError {
|
||||
#[derive(Serialize, Debug)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct ComputeStatusResponse {
|
||||
pub start_time: DateTime<Utc>,
|
||||
pub tenant: Option<String>,
|
||||
pub timeline: Option<String>,
|
||||
pub status: ComputeStatus,
|
||||
@@ -63,6 +64,7 @@ where
|
||||
/// Response of the /metrics.json API
|
||||
#[derive(Clone, Debug, Default, Serialize)]
|
||||
pub struct ComputeMetrics {
|
||||
pub wait_for_spec_ms: u64,
|
||||
pub sync_safekeepers_ms: u64,
|
||||
pub basebackup_ms: u64,
|
||||
pub config_ms: u64,
|
||||
@@ -75,4 +77,16 @@ pub struct ComputeMetrics {
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ControlPlaneSpecResponse {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
pub status: ControlPlaneComputeStatus,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ControlPlaneComputeStatus {
|
||||
// Compute is known to control-plane, but it's not
|
||||
// yet attached to any timeline / endpoint.
|
||||
Empty,
|
||||
// Compute is attached to some timeline / endpoint and
|
||||
// should be able to start with provided spec.
|
||||
Attached,
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ where
|
||||
|
||||
let log_quietly = method == Method::GET;
|
||||
async move {
|
||||
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
|
||||
if log_quietly {
|
||||
debug!("Handling request");
|
||||
} else {
|
||||
@@ -87,7 +88,11 @@ where
|
||||
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
|
||||
//
|
||||
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
|
||||
match (self.0)(request).await {
|
||||
let res = (self.0)(request).await;
|
||||
|
||||
cancellation_guard.disarm();
|
||||
|
||||
match res {
|
||||
Ok(response) => {
|
||||
let response_status = response.status();
|
||||
if log_quietly && response_status.is_success() {
|
||||
@@ -105,6 +110,38 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop guard to WARN in case the request was dropped before completion.
|
||||
struct RequestCancelled {
|
||||
warn: Option<tracing::Span>,
|
||||
}
|
||||
|
||||
impl RequestCancelled {
|
||||
/// Create the drop guard using the [`tracing::Span::current`] as the span.
|
||||
fn warn_when_dropped_without_responding() -> Self {
|
||||
RequestCancelled {
|
||||
warn: Some(tracing::Span::current()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume the drop guard without logging anything.
|
||||
fn disarm(mut self) {
|
||||
self.warn = None;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RequestCancelled {
|
||||
fn drop(&mut self) {
|
||||
if let Some(span) = self.warn.take() {
|
||||
// the span has all of the info already, but the outer `.instrument(span)` has already
|
||||
// been dropped, so we need to manually re-enter it for this message.
|
||||
//
|
||||
// this is what the instrument would do before polling so it is fine.
|
||||
let _g = span.entered();
|
||||
warn!("request was dropped before completing");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
|
||||
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
@@ -23,25 +24,64 @@ impl LogFormat {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
let default_filter_str = "info";
|
||||
static TRACING_EVENT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
metrics::register_int_counter_vec!(
|
||||
"libmetrics_tracing_event_count",
|
||||
"Number of tracing events, by level",
|
||||
&["level"]
|
||||
)
|
||||
.expect("failed to define metric")
|
||||
});
|
||||
|
||||
struct TracingEventCountLayer(&'static metrics::IntCounterVec);
|
||||
|
||||
impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
|
||||
where
|
||||
S: tracing::Subscriber,
|
||||
{
|
||||
fn on_event(
|
||||
&self,
|
||||
event: &tracing::Event<'_>,
|
||||
_ctx: tracing_subscriber::layer::Context<'_, S>,
|
||||
) {
|
||||
let level = event.metadata().level();
|
||||
let level = match *level {
|
||||
tracing::Level::ERROR => "error",
|
||||
tracing::Level::WARN => "warn",
|
||||
tracing::Level::INFO => "info",
|
||||
tracing::Level::DEBUG => "debug",
|
||||
tracing::Level::TRACE => "trace",
|
||||
};
|
||||
self.0.with_label_values(&[level]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
|
||||
let rust_log_env_filter = || {
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
|
||||
};
|
||||
|
||||
let base_logger = tracing_subscriber::fmt()
|
||||
.with_env_filter(env_filter)
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
|
||||
match log_format {
|
||||
LogFormat::Json => base_logger.json().init(),
|
||||
LogFormat::Plain => base_logger.init(),
|
||||
LogFormat::Test => base_logger.with_test_writer().init(),
|
||||
}
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
tracing_subscriber::registry()
|
||||
.with({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
let log_layer = match log_format {
|
||||
LogFormat::Json => log_layer.json().boxed(),
|
||||
LogFormat::Plain => log_layer.boxed(),
|
||||
LogFormat::Test => log_layer.with_test_writer().boxed(),
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
})
|
||||
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
|
||||
.init();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -157,3 +197,33 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
|
||||
<Self as std::fmt::Display>::fmt(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use metrics::{core::Opts, IntCounterVec};
|
||||
|
||||
use super::TracingEventCountLayer;
|
||||
|
||||
#[test]
|
||||
fn tracing_event_count_metric() {
|
||||
let counter_vec =
|
||||
IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
|
||||
let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static
|
||||
let layer = TracingEventCountLayer(counter_vec);
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
|
||||
tracing::trace!("foo");
|
||||
tracing::debug!("foo");
|
||||
tracing::info!("foo");
|
||||
tracing::warn!("foo");
|
||||
tracing::error!("foo");
|
||||
});
|
||||
|
||||
assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::time::Instant;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
let mut layer_map = LayerMap::<LayerDescriptor>::default();
|
||||
@@ -114,7 +114,7 @@ fn bench_from_captest_env(c: &mut Criterion) {
|
||||
c.bench_function("captest_uniform_queries", |b| {
|
||||
b.iter(|| {
|
||||
for q in queries.clone().into_iter() {
|
||||
layer_map.search(q.0, q.1);
|
||||
black_box(layer_map.search(q.0, q.1));
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -122,11 +122,11 @@ fn bench_from_captest_env(c: &mut Criterion) {
|
||||
// test with a key that corresponds to the RelDir entry. See pgdatadir_mapping.rs.
|
||||
c.bench_function("captest_rel_dir_query", |b| {
|
||||
b.iter(|| {
|
||||
let result = layer_map.search(
|
||||
let result = black_box(layer_map.search(
|
||||
Key::from_hex("000000067F00008000000000000000000001").unwrap(),
|
||||
// This LSN is higher than any of the LSNs in the tree
|
||||
Lsn::from_str("D0/80208AE1").unwrap(),
|
||||
);
|
||||
));
|
||||
result.unwrap();
|
||||
});
|
||||
});
|
||||
@@ -183,7 +183,7 @@ fn bench_from_real_project(c: &mut Criterion) {
|
||||
group.bench_function("uniform_queries", |b| {
|
||||
b.iter(|| {
|
||||
for q in queries.clone().into_iter() {
|
||||
layer_map.search(q.0, q.1);
|
||||
black_box(layer_map.search(q.0, q.1));
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -232,7 +232,7 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
group.bench_function("uniform_queries", |b| {
|
||||
b.iter(|| {
|
||||
for q in queries.clone().into_iter() {
|
||||
layer_map.search(q.0, q.1);
|
||||
black_box(layer_map.search(q.0, q.1));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -520,6 +520,43 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/synthetic_size:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
get:
|
||||
description: |
|
||||
Calculate tenant's synthetic size
|
||||
responses:
|
||||
"200":
|
||||
description: Tenant's synthetic size
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SyntheticSizeResponse"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/size:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -948,6 +985,84 @@ components:
|
||||
latest_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
SyntheticSizeResponse:
|
||||
type: object
|
||||
required:
|
||||
- id
|
||||
- size
|
||||
- segment_sizes
|
||||
- inputs
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
format: hex
|
||||
size:
|
||||
type: integer
|
||||
segment_sizes:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/SegmentSize"
|
||||
inputs:
|
||||
type: object
|
||||
properties:
|
||||
segments:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/SegmentData"
|
||||
timeline_inputs:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/TimelineInput"
|
||||
|
||||
SegmentSize:
|
||||
type: object
|
||||
required:
|
||||
- method
|
||||
- accum_size
|
||||
properties:
|
||||
method:
|
||||
type: string
|
||||
accum_size:
|
||||
type: integer
|
||||
|
||||
SegmentData:
|
||||
type: object
|
||||
required:
|
||||
- segment
|
||||
properties:
|
||||
segment:
|
||||
type: object
|
||||
required:
|
||||
- lsn
|
||||
properties:
|
||||
parent:
|
||||
type: integer
|
||||
lsn:
|
||||
type: integer
|
||||
size:
|
||||
type: integer
|
||||
needed:
|
||||
type: boolean
|
||||
timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
kind:
|
||||
type: string
|
||||
|
||||
TimelineInput:
|
||||
type: object
|
||||
required:
|
||||
- timeline_id
|
||||
properties:
|
||||
ancestor_id:
|
||||
type: string
|
||||
ancestor_lsn:
|
||||
type: string
|
||||
timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
Error:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@@ -1201,6 +1201,37 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
async fn post_tracing_event_handler(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
enum Level {
|
||||
Error,
|
||||
Warn,
|
||||
Info,
|
||||
Debug,
|
||||
Trace,
|
||||
}
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct Request {
|
||||
level: Level,
|
||||
message: String,
|
||||
}
|
||||
let body: Request = json_request(&mut r)
|
||||
.await
|
||||
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
|
||||
|
||||
match body.level {
|
||||
Level::Error => tracing::error!(?body.message),
|
||||
Level::Warn => tracing::warn!(?body.message),
|
||||
Level::Info => tracing::info!(?body.message),
|
||||
Level::Debug => tracing::debug!(?body.message),
|
||||
Level::Trace => tracing::trace!(?body.message),
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
pub fn make_router(
|
||||
conf: &'static PageServerConf,
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
@@ -1341,5 +1372,9 @@ pub fn make_router(
|
||||
testing_api!("set tenant state to broken", handle_tenant_break),
|
||||
)
|
||||
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
|
||||
.post(
|
||||
"/v1/tracing/event",
|
||||
testing_api!("emit a tracing event", post_tracing_event_handler),
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use metrics::core::{AtomicU64, GenericCounter};
|
||||
use metrics::{
|
||||
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
Counter, CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
|
||||
UIntGauge, UIntGaugeVec,
|
||||
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec, Counter, CounterVec,
|
||||
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
|
||||
UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::models::TenantState;
|
||||
@@ -350,11 +350,6 @@ pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static NUM_ONDISK_LAYERS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// remote storage metrics
|
||||
|
||||
/// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`].
|
||||
@@ -385,6 +380,26 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_started",
|
||||
"Incremented by the number of bytes associated with a remote timeline client operation. \
|
||||
The increment happens when the operation is scheduled.",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_finished",
|
||||
"Incremented by the number of bytes associated with a remote timeline client operation. \
|
||||
The increment happens when the operation finishes (regardless of success/failure/shutdown).",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum RemoteOpKind {
|
||||
Upload,
|
||||
@@ -739,6 +754,8 @@ pub struct RemoteTimelineClientMetrics {
|
||||
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
|
||||
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
|
||||
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
@@ -749,6 +766,8 @@ impl RemoteTimelineClientMetrics {
|
||||
remote_operation_time: Mutex::new(HashMap::default()),
|
||||
calls_unfinished_gauge: Mutex::new(HashMap::default()),
|
||||
calls_started_hist: Mutex::new(HashMap::default()),
|
||||
bytes_started_counter: Mutex::new(HashMap::default()),
|
||||
bytes_finished_counter: Mutex::new(HashMap::default()),
|
||||
remote_physical_size_gauge: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
@@ -787,6 +806,7 @@ impl RemoteTimelineClientMetrics {
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn calls_unfinished_gauge(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
@@ -828,32 +848,125 @@ impl RemoteTimelineClientMetrics {
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_started_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_finished_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl RemoteTimelineClientMetrics {
|
||||
pub fn get_bytes_started_counter_value(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Option<u64> {
|
||||
let guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
guard.get(&key).map(|counter| counter.get())
|
||||
}
|
||||
|
||||
pub fn get_bytes_finished_counter_value(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Option<u64> {
|
||||
let guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
guard.get(&key).map(|counter| counter.get())
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteTimelineClientMetrics::call_begin`].
|
||||
#[must_use]
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard(Option<IntGauge>);
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard {
|
||||
/// Decremented on drop.
|
||||
calls_unfinished_metric: Option<IntGauge>,
|
||||
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
|
||||
bytes_finished: Option<(IntCounter, u64)>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientCallMetricGuard {
|
||||
/// Consume this guard object without decrementing the metric.
|
||||
/// The caller vouches to do this manually, so that the prior increment of the gauge will cancel out.
|
||||
/// Consume this guard object without performing the metric updates it would do on `drop()`.
|
||||
/// The caller vouches to do the metric updates manually.
|
||||
pub fn will_decrement_manually(mut self) {
|
||||
self.0 = None; // prevent drop() from decrementing
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
bytes_finished,
|
||||
} = &mut self;
|
||||
calls_unfinished_metric.take();
|
||||
bytes_finished.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClientCallMetricGuard {
|
||||
fn drop(&mut self) {
|
||||
if let RemoteTimelineClientCallMetricGuard(Some(guard)) = self {
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
bytes_finished,
|
||||
} = self;
|
||||
if let Some(guard) = calls_unfinished_metric.take() {
|
||||
guard.dec();
|
||||
}
|
||||
if let Some((bytes_finished_metric, value)) = bytes_finished {
|
||||
bytes_finished_metric.inc_by(*value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The enum variants communicate to the [`RemoteTimelineClientMetrics`] whether to
|
||||
/// track the byte size of this call in applicable metric(s).
|
||||
pub(crate) enum RemoteTimelineClientMetricsCallTrackSize {
|
||||
/// Do not account for this call's byte size in any metrics.
|
||||
/// The `reason` field is there to make the call sites self-documenting
|
||||
/// about why they don't need the metric.
|
||||
DontTrackSize { reason: &'static str },
|
||||
/// Track the byte size of the call in applicable metric(s).
|
||||
Bytes(u64),
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
/// Increment the metrics that track ongoing calls to the remote timeline client instance.
|
||||
/// Update the metrics that change when a call to the remote timeline client instance starts.
|
||||
///
|
||||
/// Drop the returned guard object once the operation is finished to decrement the values.
|
||||
/// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions.
|
||||
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that
|
||||
/// is more suitable.
|
||||
/// Never do both.
|
||||
@@ -861,24 +974,51 @@ impl RemoteTimelineClientMetrics {
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) -> RemoteTimelineClientCallMetricGuard {
|
||||
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
self.calls_started_hist(file_kind, op_kind)
|
||||
.observe(unfinished_metric.get() as f64);
|
||||
unfinished_metric.inc();
|
||||
RemoteTimelineClientCallMetricGuard(Some(unfinished_metric))
|
||||
.observe(calls_unfinished_metric.get() as f64);
|
||||
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
|
||||
|
||||
let bytes_finished = match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
|
||||
// nothing to do
|
||||
None
|
||||
}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
self.bytes_started_counter(file_kind, op_kind).inc_by(size);
|
||||
let finished_counter = self.bytes_finished_counter(file_kind, op_kind);
|
||||
Some((finished_counter, size))
|
||||
}
|
||||
};
|
||||
RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric: Some(calls_unfinished_metric),
|
||||
bytes_finished,
|
||||
}
|
||||
}
|
||||
|
||||
/// Manually decrement the metric instead of using the guard object.
|
||||
/// Manually udpate the metrics that track completions, instead of using the guard object.
|
||||
/// Using the guard object is generally preferable.
|
||||
/// See [`call_begin`] for more context.
|
||||
pub(crate) fn call_end(&self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind) {
|
||||
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
pub(crate) fn call_end(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) {
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
debug_assert!(
|
||||
unfinished_metric.get() > 0,
|
||||
calls_unfinished_metric.get() > 0,
|
||||
"begin and end should cancel out"
|
||||
);
|
||||
unfinished_metric.dec();
|
||||
calls_unfinished_metric.dec();
|
||||
match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
self.bytes_finished_counter(file_kind, op_kind).inc_by(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -891,6 +1031,8 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
remote_operation_time,
|
||||
calls_unfinished_gauge,
|
||||
calls_started_hist,
|
||||
bytes_started_counter,
|
||||
bytes_finished_counter,
|
||||
} = self;
|
||||
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
|
||||
@@ -911,6 +1053,22 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_finished_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
{
|
||||
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
|
||||
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
|
||||
@@ -700,6 +700,8 @@ impl PageServerHandler {
|
||||
full_backup: bool,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// check that the timeline exists
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
@@ -712,6 +714,8 @@ impl PageServerHandler {
|
||||
.context("invalid basebackup lsn")?;
|
||||
}
|
||||
|
||||
let lsn_awaited_after = started.elapsed();
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
pgb.flush().await?;
|
||||
@@ -732,7 +736,17 @@ impl PageServerHandler {
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
pgb.flush().await?;
|
||||
info!("basebackup complete");
|
||||
|
||||
let basebackup_after = started
|
||||
.elapsed()
|
||||
.checked_sub(lsn_awaited_after)
|
||||
.unwrap_or(Duration::ZERO);
|
||||
|
||||
info!(
|
||||
lsn_await_millis = lsn_awaited_after.as_millis(),
|
||||
basebackup_millis = basebackup_after.as_millis(),
|
||||
"basebackup complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -118,6 +118,10 @@ pub struct Tenant {
|
||||
// Global pageserver config parameters
|
||||
pub conf: &'static PageServerConf,
|
||||
|
||||
/// The value creation timestamp, used to measure activation delay, see:
|
||||
/// <https://github.com/neondatabase/neon/issues/4025>
|
||||
loading_started_at: Instant,
|
||||
|
||||
state: watch::Sender<TenantState>,
|
||||
|
||||
// Overridden tenant-specific config parameters.
|
||||
@@ -1476,7 +1480,7 @@ impl Tenant {
|
||||
TenantState::Loading | TenantState::Attaching => {
|
||||
*current_state = TenantState::Active;
|
||||
|
||||
info!("Activating tenant {}", self.tenant_id);
|
||||
debug!(tenant_id = %self.tenant_id, "Activating tenant");
|
||||
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let not_broken_timelines = timelines_accessor
|
||||
@@ -1487,12 +1491,17 @@ impl Tenant {
|
||||
// down when they notice that the tenant is inactive.
|
||||
tasks::start_background_loops(self.tenant_id);
|
||||
|
||||
let mut activated_timelines = 0;
|
||||
let mut timelines_broken_during_activation = 0;
|
||||
|
||||
for timeline in not_broken_timelines {
|
||||
match timeline
|
||||
.activate(ctx)
|
||||
.context("timeline activation for activating tenant")
|
||||
{
|
||||
Ok(()) => {}
|
||||
Ok(()) => {
|
||||
activated_timelines += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to activate timeline {}: {:#}",
|
||||
@@ -1503,9 +1512,26 @@ impl Tenant {
|
||||
"failed to activate timeline {}: {}",
|
||||
timeline.timeline_id, e
|
||||
));
|
||||
|
||||
timelines_broken_during_activation += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let elapsed = self.loading_started_at.elapsed();
|
||||
let total_timelines = timelines_accessor.len();
|
||||
|
||||
// log a lot of stuff, because some tenants sometimes suffer from user-visible
|
||||
// times to activate. see https://github.com/neondatabase/neon/issues/4025
|
||||
info!(
|
||||
since_creation_millis = elapsed.as_millis(),
|
||||
tenant_id = %self.tenant_id,
|
||||
activated_timelines,
|
||||
timelines_broken_during_activation,
|
||||
total_timelines,
|
||||
post_state = <&'static str>::from(&*current_state),
|
||||
"activation attempt finished"
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -1812,6 +1838,9 @@ impl Tenant {
|
||||
Tenant {
|
||||
tenant_id,
|
||||
conf,
|
||||
// using now here is good enough approximation to catch tenants with really long
|
||||
// activation times.
|
||||
loading_started_at: Instant::now(),
|
||||
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
|
||||
@@ -48,7 +48,6 @@ mod layer_coverage;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::KeyPartitioning;
|
||||
use crate::metrics::NUM_ONDISK_LAYERS;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
@@ -56,6 +55,7 @@ use anyhow::{bail, Result};
|
||||
use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
@@ -276,19 +276,24 @@ where
|
||||
///
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
|
||||
let key = historic_layer_coverage::LayerKey::from(&*layer);
|
||||
if self.historic.contains(&key) {
|
||||
bail!(
|
||||
"Attempt to insert duplicate layer {} in layer map",
|
||||
layer.short_id()
|
||||
);
|
||||
}
|
||||
self.historic.insert(key, Arc::clone(&layer));
|
||||
match self.historic.replace(&key, Arc::clone(&layer), |existing| {
|
||||
!Self::compare_arced_layers(existing, &layer)
|
||||
}) {
|
||||
Replacement::Replaced { .. } => {
|
||||
if Self::is_l0(&layer) {
|
||||
bail!("Duplicate L0 layer {}", layer.short_id());
|
||||
}
|
||||
warn!("Replace duplicate layer {} in layer map", layer.short_id());
|
||||
}
|
||||
Replacement::Unexpected(_) => bail!("Replace layer with itself is prohibited"),
|
||||
Replacement::NotFound | Replacement::RemovalBuffered => {
|
||||
self.historic.insert(key, Arc::clone(&layer));
|
||||
|
||||
if Self::is_l0(&layer) {
|
||||
self.l0_delta_layers.push(layer);
|
||||
if Self::is_l0(&layer) {
|
||||
self.l0_delta_layers.push(layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -314,8 +319,6 @@ where
|
||||
"failed to locate removed historic layer from l0_delta_layers"
|
||||
);
|
||||
}
|
||||
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
pub(self) fn replace_historic_noflush(
|
||||
|
||||
@@ -417,14 +417,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains(&self, layer_key: &LayerKey) -> bool {
|
||||
match self.buffer.get(layer_key) {
|
||||
Some(None) => false, // layer remove was buffered
|
||||
Some(_) => true, // layer insert was buffered
|
||||
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
|
||||
self.buffer.insert(layer_key, Some(value));
|
||||
}
|
||||
|
||||
@@ -219,7 +219,8 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::metrics::{
|
||||
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
|
||||
REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
|
||||
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::{
|
||||
@@ -367,9 +368,13 @@ impl RemoteTimelineClient {
|
||||
|
||||
/// Download index file
|
||||
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download);
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Index,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
|
||||
download::download_index_part(
|
||||
self.conf,
|
||||
@@ -398,9 +403,13 @@ impl RemoteTimelineClient {
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
) -> anyhow::Result<u64> {
|
||||
let downloaded_size = {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Layer, &RemoteOpKind::Download);
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Layer,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
@@ -886,11 +895,32 @@ impl RemoteTimelineClient {
|
||||
fn calls_unfinished_metric_impl(
|
||||
&self,
|
||||
op: &UploadOp,
|
||||
) -> Option<(RemoteOpFileKind, RemoteOpKind)> {
|
||||
) -> Option<(
|
||||
RemoteOpFileKind,
|
||||
RemoteOpKind,
|
||||
RemoteTimelineClientMetricsCallTrackSize,
|
||||
)> {
|
||||
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
|
||||
let res = match op {
|
||||
UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload),
|
||||
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
|
||||
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
|
||||
UploadOp::UploadLayer(_, m) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
|
||||
),
|
||||
UploadOp::UploadMetadata(_, _) => (
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
DontTrackSize {
|
||||
reason: "metadata uploads are tiny",
|
||||
},
|
||||
),
|
||||
UploadOp::Delete(file_kind, _) => (
|
||||
*file_kind,
|
||||
RemoteOpKind::Delete,
|
||||
DontTrackSize {
|
||||
reason: "should we track deletes? positive or negative sign?",
|
||||
},
|
||||
),
|
||||
UploadOp::Barrier(_) => {
|
||||
// we do not account these
|
||||
return None;
|
||||
@@ -900,20 +930,20 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind);
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
|
||||
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
self.metrics.call_end(&file_kind, &op_kind);
|
||||
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
@@ -981,11 +1011,19 @@ impl RemoteTimelineClient {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
tenant::harness::{TenantHarness, TIMELINE_ID},
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
Tenant,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
|
||||
use std::{collections::HashSet, path::Path};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tokio::runtime::EnterGuard;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
|
||||
@@ -1034,39 +1072,80 @@ mod tests {
|
||||
assert_eq!(found, expected);
|
||||
}
|
||||
|
||||
struct TestSetup {
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
entered_runtime: EnterGuard<'static>,
|
||||
harness: TenantHarness<'static>,
|
||||
tenant: Arc<Tenant>,
|
||||
tenant_ctx: RequestContext,
|
||||
remote_fs_dir: PathBuf,
|
||||
client: Arc<RemoteTimelineClient>,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
fn new(test_name: &str) -> anyhow::Result<Self> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = timeline.initialize(&ctx).unwrap();
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
|
||||
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
runtime,
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl: storage,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
runtime,
|
||||
entered_runtime,
|
||||
harness,
|
||||
tenant,
|
||||
tenant_ctx: ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test scheduling
|
||||
#[test]
|
||||
fn upload_scheduling() -> anyhow::Result<()> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let _entered = runtime.enter();
|
||||
|
||||
let harness = TenantHarness::create("upload_scheduling")?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
let _timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
// Test outline:
|
||||
//
|
||||
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
|
||||
@@ -1081,21 +1160,19 @@ mod tests {
|
||||
// Schedule another deletion. Check that it's launched immediately.
|
||||
// Schedule index upload. Check that it's queued
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let storage_impl = GenericRemoteStorage::from_config(&storage_config)?;
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
let TestSetup {
|
||||
runtime,
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
});
|
||||
entered_runtime: _entered_runtime,
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
tenant_ctx: _tenant_ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
} = TestSetup::new("upload_scheduling").unwrap();
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let remote_timeline_dir =
|
||||
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
|
||||
@@ -1216,4 +1293,90 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> {
|
||||
// Setup
|
||||
|
||||
let TestSetup {
|
||||
runtime,
|
||||
harness,
|
||||
client,
|
||||
..
|
||||
} = TestSetup::new("metrics")?;
|
||||
|
||||
let metadata = dummy_metadata(Lsn(0x10));
|
||||
client.init_upload_queue_for_empty_remote(&metadata)?;
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let content_1 = dummy_contents("foo");
|
||||
std::fs::write(
|
||||
timeline_path.join(layer_file_name_1.file_name()),
|
||||
&content_1,
|
||||
)?;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct BytesStartedFinished {
|
||||
started: Option<usize>,
|
||||
finished: Option<usize>,
|
||||
}
|
||||
let get_bytes_started_stopped = || {
|
||||
let started = client
|
||||
.metrics
|
||||
.get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
let stopped = client
|
||||
.metrics
|
||||
.get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
BytesStartedFinished {
|
||||
started,
|
||||
finished: stopped,
|
||||
}
|
||||
};
|
||||
|
||||
// Test
|
||||
|
||||
let init = get_bytes_started_stopped();
|
||||
|
||||
client.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)?;
|
||||
|
||||
let pre = get_bytes_started_stopped();
|
||||
|
||||
runtime.block_on(client.wait_completion())?;
|
||||
|
||||
let post = get_bytes_started_stopped();
|
||||
|
||||
// Validate
|
||||
|
||||
assert_eq!(
|
||||
init,
|
||||
BytesStartedFinished {
|
||||
started: None,
|
||||
finished: None
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
pre,
|
||||
BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
// assert that the _finished metric is created eagerly so that subtractions work on first sample
|
||||
finished: Some(0),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
post,
|
||||
BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
finished: Some(content_1.len())
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3300,6 +3300,10 @@ impl Timeline {
|
||||
|
||||
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
|
||||
|
||||
fail_point!("compact-level0-phase1-finish", |_| {
|
||||
Err(anyhow::anyhow!("failpoint compact-level0-phase1-finish").into())
|
||||
});
|
||||
|
||||
Ok(CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
|
||||
@@ -23,7 +23,7 @@ hmac.workspace = true
|
||||
hostname.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper-tungstenite.workspace = true
|
||||
hyper = { workspace = true, features = ["http2", "http1", "tcp", "runtime"] }
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
@@ -65,7 +65,6 @@ x509-parser.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
tokio-util.workspace = true
|
||||
percent-encoding.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rcgen.workspace = true
|
||||
|
||||
@@ -1,21 +1,15 @@
|
||||
use crate::{
|
||||
auth, cancellation::CancelMap, config::ProxyConfig, console, error::io_error,
|
||||
proxy::handle_ws_client,
|
||||
cancellation::CancelMap, config::ProxyConfig, error::io_error, proxy::handle_ws_client,
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
use hyper::{
|
||||
server::{accept, conn::AddrIncoming},
|
||||
upgrade::Upgraded,
|
||||
Body, Method, Request, Response, StatusCode,
|
||||
Body, Request, Response, StatusCode,
|
||||
};
|
||||
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
|
||||
use pin_project_lite::pin_project;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use percent_encoding::percent_decode;
|
||||
use std::collections::HashMap;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
future::ready,
|
||||
@@ -30,7 +24,6 @@ use tokio::{
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use url::form_urlencoded;
|
||||
use utils::http::{error::ApiError, json::json_response};
|
||||
|
||||
// TODO: use `std::sync::Exclusive` once it's stabilized.
|
||||
@@ -166,7 +159,6 @@ async fn ws_handler(
|
||||
config: &'static ProxyConfig,
|
||||
cancel_map: Arc<CancelMap>,
|
||||
session_id: uuid::Uuid,
|
||||
cache: Arc<Mutex<ConnectionCache>>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let host = request
|
||||
.headers()
|
||||
@@ -189,170 +181,13 @@ async fn ws_handler(
|
||||
|
||||
// Return the response so the spawned future can continue.
|
||||
Ok(response)
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
|
||||
match handle_sql(config, request, cache).await {
|
||||
Ok(resp) => json_response(StatusCode::OK, resp).map(|mut r| {
|
||||
r.headers_mut().insert(
|
||||
"Access-Control-Allow-Origin",
|
||||
hyper::http::HeaderValue::from_static("*"),
|
||||
);
|
||||
r
|
||||
}),
|
||||
Err(e) => json_response(StatusCode::BAD_REQUEST, format!("error: {e:?}")),
|
||||
}
|
||||
} else if request.uri().path() == "/sleep" {
|
||||
// sleep 15ms
|
||||
tokio::time::sleep(std::time::Duration::from_millis(15)).await;
|
||||
json_response(StatusCode::OK, "done")
|
||||
} else {
|
||||
json_response(StatusCode::BAD_REQUEST, "query is not supported")
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: return different error codes
|
||||
async fn handle_sql(
|
||||
config: &'static ProxyConfig,
|
||||
request: Request<Body>,
|
||||
cache: Arc<Mutex<ConnectionCache>>,
|
||||
) -> anyhow::Result<String> {
|
||||
let get_params = request
|
||||
.uri()
|
||||
.query()
|
||||
.ok_or(anyhow::anyhow!("missing query string"))?;
|
||||
|
||||
let parsed_params: HashMap<String, String> = form_urlencoded::parse(get_params.as_bytes())
|
||||
.into_owned()
|
||||
.collect();
|
||||
|
||||
let sql = parsed_params
|
||||
.get("query")
|
||||
.ok_or(anyhow::anyhow!("missing query"))?;
|
||||
let dbname = parsed_params
|
||||
.get("dbname")
|
||||
.ok_or(anyhow::anyhow!("missing dbname"))?;
|
||||
let username = parsed_params
|
||||
.get("username")
|
||||
.ok_or(anyhow::anyhow!("missing username"))?;
|
||||
let password = parsed_params
|
||||
.get("password")
|
||||
.ok_or(anyhow::anyhow!("missing password"))?;
|
||||
// XXX: does URI includes host too? then Url::parse() should work for both host_str and params
|
||||
let hostname = request
|
||||
.headers()
|
||||
.get("host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.and_then(|h| h.split(':').next())
|
||||
.map(|s| s.to_string())
|
||||
.ok_or(anyhow::anyhow!("missing host header"))?;
|
||||
|
||||
let params = StartupMessageParams::new([
|
||||
("user", username.as_str()),
|
||||
("database", dbname.as_str()),
|
||||
("application_name", "proxy_http_sql"),
|
||||
]);
|
||||
let tls = config.tls_config.as_ref();
|
||||
let common_names = tls.and_then(|tls| tls.common_names.clone());
|
||||
let creds = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| auth::ClientCredentials::parse(¶ms, Some(hostname.as_str()), common_names))
|
||||
.transpose()?;
|
||||
|
||||
let extra = console::ConsoleReqExtra {
|
||||
session_id: uuid::Uuid::new_v4(),
|
||||
application_name: Some("proxy_http_sql"),
|
||||
};
|
||||
|
||||
let node = creds.wake_compute(&extra).await?.expect("msg");
|
||||
let conf = node.value.config;
|
||||
|
||||
let host = match conf.get_hosts().first().expect("no host") {
|
||||
tokio_postgres::config::Host::Tcp(host) => host,
|
||||
tokio_postgres::config::Host::Unix(_) => {
|
||||
return Err(anyhow::anyhow!("unix socket is not supported"));
|
||||
}
|
||||
};
|
||||
|
||||
let conn_string = &format!(
|
||||
"host={} port={} user={} password={} dbname={}",
|
||||
host,
|
||||
conf.get_ports().first().expect("no port"),
|
||||
username,
|
||||
password,
|
||||
dbname
|
||||
);
|
||||
|
||||
ConnectionCache::execute(&cache, conn_string, &hostname, sql).await
|
||||
}
|
||||
|
||||
pub struct ConnectionCache {
|
||||
connections: HashMap<String, tokio_postgres::Client>,
|
||||
}
|
||||
|
||||
impl ConnectionCache {
|
||||
pub fn new() -> Arc<Mutex<Self>> {
|
||||
Arc::new(Mutex::new(Self {
|
||||
connections: HashMap::new(),
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn execute(
|
||||
cache: &Arc<Mutex<ConnectionCache>>,
|
||||
conn_string: &str,
|
||||
hostname: &str,
|
||||
sql: &str,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: let go mutex when establishing connection
|
||||
let mut cache = cache.lock().await;
|
||||
let cache_key = format!("connstr={}, hostname={}", conn_string, hostname);
|
||||
let client = if let Some(client) = cache.connections.get(&cache_key) {
|
||||
info!("using cached connection {}", conn_string);
|
||||
client
|
||||
} else {
|
||||
info!("!!!! connecting to: {}", conn_string);
|
||||
|
||||
let (client, connection) =
|
||||
tokio_postgres::connect(conn_string, tokio_postgres::NoTls).await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
// TODO: remove connection from cache
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
cache.connections.insert(cache_key.clone(), client);
|
||||
cache.connections.get(&cache_key).unwrap()
|
||||
};
|
||||
|
||||
let sql = percent_decode(sql.as_bytes()).decode_utf8()?.to_string();
|
||||
|
||||
let rows: Vec<HashMap<_, _>> = client
|
||||
.simple_query(&sql)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(|el| {
|
||||
if let tokio_postgres::SimpleQueryMessage::Row(row) = el {
|
||||
let mut serilaized_row: HashMap<String, String> = HashMap::new();
|
||||
for i in 0..row.len() {
|
||||
let col = row.columns().get(i).map_or("?", |c| c.name());
|
||||
let val = row.get(i).unwrap_or("?");
|
||||
serilaized_row.insert(col.into(), val.into());
|
||||
}
|
||||
Some(serilaized_row)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(serde_json::to_string(&rows)?)
|
||||
json_response(StatusCode::OK, "Connect with a websocket client")
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
cache: &'static Arc<Mutex<ConnectionCache>>,
|
||||
ws_listener: TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -386,7 +221,7 @@ pub async fn task_main(
|
||||
move |req: Request<Body>| async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
ws_handler(req, config, cancel_map, session_id, cache.clone())
|
||||
ws_handler(req, config, cancel_map, session_id)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = format_args!("{session_id}")
|
||||
|
||||
@@ -32,8 +32,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
use utils::{project_git_version, sentry_init::init_sentry};
|
||||
|
||||
use crate::http::websocket::ConnectionCache;
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
/// Flattens `Result<Result<T>>` into `Result<T>`.
|
||||
@@ -55,8 +53,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
let args = cli().get_matches();
|
||||
let config = build_config(&args)?;
|
||||
|
||||
let wsconn_cache = Box::leak(Box::new(ConnectionCache::new()));
|
||||
|
||||
info!("Authentication backend: {}", config.auth_backend);
|
||||
|
||||
// Check that we can bind to address before further initialization
|
||||
@@ -86,7 +82,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
client_tasks.push(tokio::spawn(http::websocket::task_main(
|
||||
config,
|
||||
wsconn_cache,
|
||||
wss_listener,
|
||||
cancellation_token.clone(),
|
||||
)));
|
||||
|
||||
@@ -27,6 +27,8 @@ use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -206,6 +208,10 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
// Send keepalive messages to walproposer, to make sure it receives updates
|
||||
// even when it writes a steady stream of messages.
|
||||
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
|
||||
struct WalAcceptor {
|
||||
tli: Arc<Timeline>,
|
||||
@@ -253,18 +259,25 @@ impl WalAcceptor {
|
||||
timeline: Arc::clone(&self.tli),
|
||||
};
|
||||
|
||||
let mut next_msg: ProposerAcceptorMessage;
|
||||
// After this timestamp we will stop processing AppendRequests and send a response
|
||||
// to the walproposer. walproposer sends at least one AppendRequest per second,
|
||||
// we will send keepalives by replying to these requests once per second.
|
||||
let mut next_keepalive = Instant::now();
|
||||
|
||||
loop {
|
||||
let opt_msg = self.msg_rx.recv().await;
|
||||
if opt_msg.is_none() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
next_msg = opt_msg.unwrap();
|
||||
let mut next_msg = opt_msg.unwrap();
|
||||
|
||||
if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
// loop through AppendRequest's while it's readily available to
|
||||
// write as many WAL as possible without fsyncing
|
||||
//
|
||||
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
|
||||
// Otherwise, we might end up in a situation where we read a message, but don't
|
||||
// process it.
|
||||
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
|
||||
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
|
||||
@@ -274,6 +287,11 @@ impl WalAcceptor {
|
||||
}
|
||||
}
|
||||
|
||||
// get out of this loop if keepalive time is reached
|
||||
if Instant::now() >= next_keepalive {
|
||||
break;
|
||||
}
|
||||
|
||||
match self.msg_rx.try_recv() {
|
||||
Ok(msg) => next_msg = msg,
|
||||
Err(TryRecvError::Empty) => break,
|
||||
@@ -282,18 +300,18 @@ impl WalAcceptor {
|
||||
}
|
||||
|
||||
// flush all written WAL to the disk
|
||||
if let Some(reply) = self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
}
|
||||
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?
|
||||
} else {
|
||||
// process message other than AppendRequest
|
||||
if let Some(reply) = self.tli.process_msg(&next_msg)? {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
self.tli.process_msg(&next_msg)?
|
||||
};
|
||||
|
||||
if let Some(reply) = reply_msg {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
// reset keepalive time
|
||||
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,8 @@ PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
|
||||
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
"pageserver_remote_physical_size",
|
||||
"pageserver_remote_timeline_client_bytes_started_total",
|
||||
"pageserver_remote_timeline_client_bytes_finished_total",
|
||||
)
|
||||
|
||||
PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
@@ -53,6 +55,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_storage_operations_seconds_global_bucket",
|
||||
"libmetrics_launch_timestamp",
|
||||
"libmetrics_build_info",
|
||||
"libmetrics_tracing_event_count_total",
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
|
||||
@@ -2928,32 +2928,18 @@ def fork_at_current_lsn(
|
||||
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
|
||||
|
||||
|
||||
def wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: List[Safekeeper],
|
||||
pageserver: NeonPageserver,
|
||||
):
|
||||
sk_commit_lsns = [
|
||||
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn for sk in safekeepers
|
||||
]
|
||||
lsn = max(sk_commit_lsns)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, lsn)
|
||||
return lsn
|
||||
|
||||
|
||||
def wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: List[Safekeeper],
|
||||
pageserver: NeonPageserver,
|
||||
):
|
||||
lsn = wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
|
||||
tenant_id, timeline_id, safekeepers, pageserver
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
def last_flush_lsn_upload(
|
||||
env: NeonEnv, endpoint: Endpoint, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> Lsn:
|
||||
"""
|
||||
Wait for pageserver to catch to the latest flush LSN of given endpoint,
|
||||
checkpoint pageserver, and wait for it to be uploaded (remote_consistent_lsn
|
||||
reaching flush LSN).
|
||||
"""
|
||||
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
ps_http = env.pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
|
||||
return lsn
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
return last_flush_lsn
|
||||
|
||||
@@ -550,3 +550,13 @@ class PageserverHttpClient(requests.Session):
|
||||
def tenant_break(self, tenant_id: TenantId):
|
||||
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
|
||||
self.verbose_error(res)
|
||||
|
||||
def post_tracing_event(self, level: str, message: str):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tracing/event",
|
||||
json={
|
||||
"level": level,
|
||||
"message": message,
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -54,10 +54,9 @@ def wait_for_upload(
|
||||
if current_lsn >= lsn:
|
||||
log.info("wait finished")
|
||||
return
|
||||
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
log.info(
|
||||
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
|
||||
lsn, current_lsn, i + 1
|
||||
)
|
||||
f"waiting for remote_consistent_lsn to reach {lsn}, now {current_lsn}, last_record_lsn={lr_lsn}, iteration {i + 1}"
|
||||
)
|
||||
time.sleep(1)
|
||||
raise Exception(
|
||||
|
||||
42
test_runner/regress/test_duplicate_layers.py
Normal file
42
test_runner/regress/test_duplicate_layers.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
|
||||
|
||||
# Test duplicate layer detection
|
||||
#
|
||||
# This test sets fail point at the end of first compaction phase:
|
||||
# after flushing new L1 layers but before deletion of L0 layes
|
||||
# It should cause generation of duplicate L1 layer by compaction after restart
|
||||
@pytest.mark.timeout(600)
|
||||
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# These warnings are expected, when the pageserver is restarted abruptly
|
||||
env.pageserver.allowed_errors.append(".*found future image layer.*")
|
||||
env.pageserver.allowed_errors.append(".*found future delta layer.*")
|
||||
env.pageserver.allowed_errors.append(".*duplicate layer.*")
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Use aggressive compaction and checkpoint settings
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "1 s",
|
||||
"compaction_threshold": "3",
|
||||
}
|
||||
)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
connstr = endpoint.connstr(options="-csynchronous_commit=off")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
|
||||
|
||||
pageserver_http.configure_failpoints(("compact-level0-phase1-finish", "exit"))
|
||||
|
||||
with pytest.raises(Exception):
|
||||
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
time.sleep(10) # let compaction to be performed
|
||||
@@ -6,7 +6,6 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
@@ -199,7 +198,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
|
||||
# with image_creation_threshold=1 which we will use on the last compaction
|
||||
cur.execute("vacuum")
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
last_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
if i == 1 and j == 2 and k == 1:
|
||||
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
|
||||
@@ -222,10 +221,8 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_update_config({"image_creation_threshold": "1"})
|
||||
ps_http.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
# wait for all uploads to finish
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
tenant_id, timeline_id, env.safekeepers, env.pageserver
|
||||
)
|
||||
# wait for all uploads to finish (checkpoint has been done above)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_lsn)
|
||||
|
||||
# shutdown safekeepers to avoid on-demand downloads from walreceiver
|
||||
for sk in env.safekeepers:
|
||||
|
||||
49
test_runner/regress/test_logging.py
Normal file
49
test_runner/regress/test_logging.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "error"])
|
||||
def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str):
|
||||
# self-test: make sure the event is logged (i.e., our testing endpoint works)
|
||||
log_expected = {
|
||||
"trace": False,
|
||||
"debug": False,
|
||||
"info": True,
|
||||
"warn": True,
|
||||
"error": True,
|
||||
}[level]
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
msg_id = uuid.uuid4().hex
|
||||
|
||||
# NB: the _total suffix is added by our prometheus client
|
||||
before = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
|
||||
|
||||
# post the event
|
||||
ps_http.post_tracing_event(level, msg_id)
|
||||
if log_expected:
|
||||
env.pageserver.allowed_errors.append(f".*{msg_id}.*")
|
||||
|
||||
def assert_logged():
|
||||
if not log_expected:
|
||||
return
|
||||
assert env.pageserver.log_contains(f".*{msg_id}.*")
|
||||
|
||||
wait_until(10, 0.5, assert_logged)
|
||||
|
||||
# make sure it's counted
|
||||
def assert_metric_value():
|
||||
if not log_expected:
|
||||
return
|
||||
# NB: the _total suffix is added by our prometheus client
|
||||
val = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
|
||||
val = val or 0.0
|
||||
log.info("libmetrics_tracing_event_count: %s", val)
|
||||
assert val > (before or 0.0)
|
||||
|
||||
wait_until(10, 1, assert_metric_value)
|
||||
@@ -12,8 +12,8 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
@@ -207,9 +207,7 @@ def test_ondemand_download_timetravel(
|
||||
env.endpoints.stop_all()
|
||||
|
||||
# wait until pageserver has successfully uploaded all the data to remote storage
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
tenant_id, timeline_id, env.safekeepers, env.pageserver
|
||||
)
|
||||
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
def get_api_current_physical_size():
|
||||
d = client.timeline_detail(tenant_id, timeline_id)
|
||||
@@ -347,12 +345,9 @@ def test_download_remote_layers_api(
|
||||
"""
|
||||
)
|
||||
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
env.endpoints.stop_all()
|
||||
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
tenant_id, timeline_id, env.safekeepers, env.pageserver
|
||||
)
|
||||
|
||||
def get_api_current_physical_size():
|
||||
d = client.timeline_detail(tenant_id, timeline_id)
|
||||
return d["current_physical_size"]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2.extras
|
||||
@@ -22,9 +23,12 @@ wait_lsn_timeout='111 s';
|
||||
checkpoint_distance = 10000
|
||||
compaction_target_size = 1048576
|
||||
evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = "23 hours" }
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
# we configure eviction but no remote storage, there might be error lines
|
||||
env.pageserver.allowed_errors.append(".* no remote storage configured, cannot evict layers .*")
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
# Check that we raise on misspelled configs
|
||||
@@ -44,6 +48,7 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
"checkpoint_distance": "20000",
|
||||
"gc_period": "30sec",
|
||||
"evictions_low_residence_duration_metric_threshold": "42s",
|
||||
"eviction_policy": json.dumps({"kind": "NoEviction"}),
|
||||
}
|
||||
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
|
||||
|
||||
@@ -84,6 +89,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
assert effective_config["image_creation_threshold"] == 3
|
||||
assert effective_config["pitr_interval"] == "7days"
|
||||
assert effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
|
||||
assert effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "20s",
|
||||
"threshold": "23h",
|
||||
}
|
||||
|
||||
# check the configuration of the new tenant
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
@@ -121,6 +131,9 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
assert (
|
||||
new_effective_config["evictions_low_residence_duration_metric_threshold"] == "42s"
|
||||
), "Should override default value"
|
||||
assert new_effective_config["eviction_policy"] == {
|
||||
"kind": "NoEviction"
|
||||
}, "Specific 'eviction_policy' config should override the default value"
|
||||
assert new_effective_config["compaction_target_size"] == 1048576
|
||||
assert new_effective_config["compaction_period"] == "20s"
|
||||
assert new_effective_config["compaction_threshold"] == 10
|
||||
@@ -135,6 +148,9 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
"compaction_period": "80sec",
|
||||
"image_creation_threshold": "2",
|
||||
"evictions_low_residence_duration_metric_threshold": "23h",
|
||||
"eviction_policy": json.dumps(
|
||||
{"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"}
|
||||
),
|
||||
}
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id=tenant,
|
||||
@@ -180,6 +196,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
assert (
|
||||
updated_effective_config["evictions_low_residence_duration_metric_threshold"] == "23h"
|
||||
), "Should override default value"
|
||||
assert updated_effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "1m 20s",
|
||||
"threshold": "1day 18h",
|
||||
}, "Specific 'eviction_policy' config should override the default value"
|
||||
assert updated_effective_config["compaction_target_size"] == 1048576
|
||||
assert updated_effective_config["compaction_threshold"] == 10
|
||||
assert updated_effective_config["gc_horizon"] == 67108864
|
||||
@@ -239,6 +260,11 @@ evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
assert final_effective_config["gc_period"] == "1h"
|
||||
assert final_effective_config["image_creation_threshold"] == 3
|
||||
assert final_effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
|
||||
assert final_effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "20s",
|
||||
"threshold": "23h",
|
||||
}
|
||||
|
||||
# restart the pageserver and ensure that the config is still correct
|
||||
env.pageserver.stop()
|
||||
|
||||
@@ -21,7 +21,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_tenant_state,
|
||||
@@ -174,12 +174,9 @@ def test_tenants_attached_after_download(
|
||||
)
|
||||
|
||||
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
env.endpoints.stop_all()
|
||||
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
tenant_id, timeline_id, env.safekeepers, env.pageserver
|
||||
)
|
||||
|
||||
env.pageserver.stop()
|
||||
|
||||
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user