Compare commits

..

1 Commits

Author SHA1 Message Date
Konstantin Knizhnik
44dfe405cf Add view to inspect postgres log through SQL 2023-04-25 17:37:22 +03:00
33 changed files with 419 additions and 1143 deletions

View File

@@ -1,47 +0,0 @@
storage:
vars:
bucket_name: neon-dev-storage-eu-central-1
bucket_region: eu-central-1
# We only register/update storage in one preview console and manually copy to other instances
console_mgmt_base_url: http://neon-internal-api.helium.aws.neon.build
broker_endpoint: http://storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.helium.aws.neon.build/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: eu-central-1
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-central-1
console_region_id: aws-eu-central-1
sentry_environment: staging
children:
pageservers:
hosts:
pageserver-0.eu-central-1.aws.neon.build:
ansible_host: i-011f93ec26cfba2d4
safekeepers:
hosts:
safekeeper-0.eu-central-1.aws.neon.build:
ansible_host: i-0ff026d27babf8ddd
safekeeper-1.eu-central-1.aws.neon.build:
ansible_host: i-03983a49ee54725d9
safekeeper-2.eu-central-1.aws.neon.build:
ansible_host: i-0bd025ecdb61b0db3

View File

@@ -48,6 +48,8 @@ storage:
hosts:
safekeeper-0.us-east-2.aws.neon.build:
ansible_host: i-027662bd552bf5db0
safekeeper-1.us-east-2.aws.neon.build:
ansible_host: i-0171efc3604a7b907
safekeeper-2.us-east-2.aws.neon.build:
ansible_host: i-0de0b03a51676a6ce
safekeeper-99.us-east-2.aws.neon.build:

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: staging
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "staging"

View File

@@ -23,7 +23,6 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.eu-west-1.aws.neon.build"
otelExporterOtlpEndpoint: "https://otel-collector.zeta.eu-west-1.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -9,7 +9,6 @@ settings:
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
uri: "https://console.stage.neon.tech/psql_session/"
domain: "pg.neon.build"
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"

View File

@@ -24,7 +24,6 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.cloud.stage.neon.tech"
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -25,7 +25,6 @@ settings:
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
extraDomains: ["*.us-east-2.postgres.zenith.tech", "*.us-east-2.retooldb-staging.com"]
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -1,67 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/management/api/v2"
domain: "*.cloud.${PREVIEW_NAME}.aws.neon.build"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/billing/api/v1/usage_events"
metricCollectionInterval: "1min"
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: test
neon_region: ${PREVIEW_NAME}.eu-central-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: cloud.${PREVIEW_NAME}.aws.neon.build
httpsPort: 443
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -23,8 +23,8 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-1.aws.neon.tech"
# *.us-east-1.retooldb.com hasn't been delegated yet.
extraDomains: ["*.us-east-1.postgres.vercel-storage.com"]
# These domains haven't been delegated yet.
# extraDomains: ["*.us-east-1.retooldb.com", "*.us-east-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -541,7 +541,7 @@ jobs:
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
needs: [ promote-images, tag ]
needs: [ push-docker-hub, tag ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
@@ -584,7 +584,8 @@ jobs:
neon-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
container: gcr.io/kaniko-project/executor:v1.9.2-debug
# https://github.com/GoogleContainerTools/kaniko/issues/2005
container: gcr.io/kaniko-project/executor:v1.7.0-debug
defaults:
run:
shell: sh -eu {0}
@@ -596,32 +597,11 @@ jobs:
submodules: true
fetch-depth: 0
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build neon
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
@@ -672,7 +652,7 @@ jobs:
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
needs: [ tag ]
container: gcr.io/kaniko-project/executor:v1.9.2-debug
container: gcr.io/kaniko-project/executor:v1.7.0-debug
defaults:
run:
shell: sh -eu {0}
@@ -681,41 +661,18 @@ jobs:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute tools
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--dockerfile Dockerfile.compute-tools
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
compute-node-image:
runs-on: [ self-hosted, gen3, large ]
container: gcr.io/kaniko-project/executor:v1.9.2-debug
container: gcr.io/kaniko-project/executor:v1.7.0-debug
needs: [ tag ]
strategy:
fail-fast: false
@@ -732,36 +689,12 @@ jobs:
submodules: true
fetch-depth: 0
- name: Configure ECR and Docker Hub login
run: |
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
echo "::add-mask::${DOCKERHUB_AUTH}"
cat <<-EOF > /kaniko/.docker/config.json
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "${DOCKERHUB_AUTH}"
}
},
"credHelpers": {
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
}
}
EOF
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions
run:
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.sha }}
--build-arg PG_VERSION=${{ matrix.version }}
--dockerfile Dockerfile.compute-node
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --build-arg PG_VERSION=${{ matrix.version }} --dockerfile Dockerfile.compute-node --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
@@ -853,8 +786,41 @@ jobs:
runs-on: [ self-hosted, gen3, small ]
needs: [ tag, test-images, vm-compute-node-image ]
container: golang:1.19-bullseye
# Don't add if-condition here.
# The job should always be run because we have dependant other jobs that shouldn't be skipped
if: github.event_name != 'workflow_dispatch'
steps:
- name: Install Crane & ECR helper
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
- name: Configure ECR login
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Add latest tag to images
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
push-docker-hub:
runs-on: [ self-hosted, dev, x64 ]
needs: [ promote-images, tag ]
container: golang:1.19-bullseye
steps:
- name: Install Crane & ECR helper
@@ -867,27 +833,31 @@ jobs:
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Copy vm-compute-node images to Docker Hub
run: |
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
- name: Pull neon image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} neon
- name: Add latest tag to images
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
- name: Pull compute tools image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} compute-tools
- name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} compute-node-v14
- name: Pull vm compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} compute-node-v15
- name: Pull vm compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
- name: Push images to production ECR
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
github.event_name != 'workflow_dispatch'
run: |
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
@@ -902,12 +872,28 @@ jobs:
echo "" > /github/home/.docker/config.json
crane auth login -u ${{ secrets.NEON_DOCKERHUB_USERNAME }} -p ${{ secrets.NEON_DOCKERHUB_PASSWORD }} index.docker.io
- name: Push vm-compute-node to Docker Hub
run: |
crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push neon image to Docker Hub
run: crane push neon neondatabase/neon:${{needs.tag.outputs.build-tag}}
- name: Push latest tags to Docker Hub
- name: Push compute tools image to Docker Hub
run: crane push compute-tools neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
- name: Push compute node v14 image to Docker Hub
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push vm compute node v14 image to Docker Hub
run: crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push compute node v15 image to Docker Hub
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push vm compute node v15 image to Docker Hub
run: crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned
- name: Add latest tag to images in Docker Hub
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
@@ -927,7 +913,7 @@ jobs:
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ promote-images, tag, regress-tests ]
needs: [ push-docker-hub, tag, regress-tests ]
if: |
contains(github.event.pull_request.labels.*.name, 'deploy-test-storage') &&
github.event_name != 'workflow_dispatch'
@@ -961,7 +947,7 @@ jobs:
deploy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
needs: [ promote-images, tag, regress-tests ]
needs: [ push-docker-hub, tag, regress-tests ]
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
steps:
- name: Fix git ownership
@@ -998,7 +984,7 @@ jobs:
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ promote-images, tag, regress-tests ]
needs: [ push-docker-hub, tag, regress-tests ]
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
steps:
- name: Promote compatibility snapshot for the release

View File

@@ -48,8 +48,7 @@ jobs:
shell: bash
strategy:
matrix:
# TODO(sergey): Fix storage deploy in eu-central-1
target_region: [ eu-west-1, us-east-2]
target_region: [ eu-west-1, us-east-2 ]
environment:
name: dev-${{ matrix.target_region }}
steps:
@@ -134,53 +133,6 @@ jobs:
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-preview-proxy-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployProxy
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy preview proxies
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
for PREVIEW_NAME in helium argon krypton xenon radon oganesson hydrogen nitrogen oxygen fluorine chlorine; do
export PREVIEW_NAME
envsubst <.github/helm-values/preview-template.neon-proxy-scram.yaml >preview-${PREVIEW_NAME}.neon-proxy-scram.yaml
helm upgrade neon-proxy-scram-${PREVIEW_NAME} neondatabase/neon-proxy --namespace neon-proxy-${PREVIEW_NAME} --create-namespace --install --atomic -f preview-${PREVIEW_NAME}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
done
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker-new:
runs-on: [ self-hosted, gen3, small ]
@@ -196,8 +148,6 @@ jobs:
target_cluster: dev-us-east-2-beta
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
- target_region: eu-central-1
target_cluster: dev-eu-central-1-alpha
environment:
name: dev-${{ matrix.target_region }}
steps:

View File

@@ -359,8 +359,8 @@ impl PageServerNode {
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.remove("eviction_policy")
.map(serde_json::from_str)
.get("eviction_policy")
.map(|x| serde_json::from_str(x))
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
min_resident_size_override: settings

View File

@@ -76,7 +76,6 @@ where
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
@@ -88,11 +87,7 @@ where
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
//
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
let res = (self.0)(request).await;
cancellation_guard.disarm();
match res {
match (self.0)(request).await {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
@@ -110,38 +105,6 @@ where
}
}
/// Drop guard to WARN in case the request was dropped before completion.
struct RequestCancelled {
warn: Option<tracing::Span>,
}
impl RequestCancelled {
/// Create the drop guard using the [`tracing::Span::current`] as the span.
fn warn_when_dropped_without_responding() -> Self {
RequestCancelled {
warn: Some(tracing::Span::current()),
}
}
/// Consume the drop guard without logging anything.
fn disarm(mut self) {
self.warn = None;
}
}
impl Drop for RequestCancelled {
fn drop(&mut self) {
if let Some(span) = self.warn.take() {
// the span has all of the info already, but the outer `.instrument(span)` has already
// been dropped, so we need to manually re-enter it for this message.
//
// this is what the instrument would do before polling so it is fine.
let _g = span.entered();
warn!("request was dropped before completing");
}
}
}
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
SERVE_METRICS_COUNT.inc();

View File

@@ -1,7 +1,6 @@
use std::str::FromStr;
use anyhow::Context;
use once_cell::sync::Lazy;
use strum_macros::{EnumString, EnumVariantNames};
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
@@ -24,64 +23,25 @@ impl LogFormat {
}
}
static TRACING_EVENT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
metrics::register_int_counter_vec!(
"libmetrics_tracing_event_count",
"Number of tracing events, by level",
&["level"]
)
.expect("failed to define metric")
});
struct TracingEventCountLayer(&'static metrics::IntCounterVec);
impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
where
S: tracing::Subscriber,
{
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let level = event.metadata().level();
let level = match *level {
tracing::Level::ERROR => "error",
tracing::Level::WARN => "warn",
tracing::Level::INFO => "info",
tracing::Level::DEBUG => "debug",
tracing::Level::TRACE => "trace",
};
self.0.with_label_values(&[level]).inc();
}
}
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
let default_filter_str = "info";
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
})
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
.init();
let base_logger = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
match log_format {
LogFormat::Json => base_logger.json().init(),
LogFormat::Plain => base_logger.init(),
LogFormat::Test => base_logger.with_test_writer().init(),
}
Ok(())
}
@@ -197,33 +157,3 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
<Self as std::fmt::Display>::fmt(self, f)
}
}
#[cfg(test)]
mod tests {
use metrics::{core::Opts, IntCounterVec};
use super::TracingEventCountLayer;
#[test]
fn tracing_event_count_metric() {
let counter_vec =
IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static
let layer = TracingEventCountLayer(counter_vec);
use tracing_subscriber::prelude::*;
tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
tracing::trace!("foo");
tracing::debug!("foo");
tracing::info!("foo");
tracing::warn!("foo");
tracing::error!("foo");
});
assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
}
}

View File

@@ -1201,37 +1201,6 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
)
}
#[cfg(feature = "testing")]
async fn post_tracing_event_handler(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
enum Level {
Error,
Warn,
Info,
Debug,
Trace,
}
#[derive(Debug, serde::Deserialize)]
struct Request {
level: Level,
message: String,
}
let body: Request = json_request(&mut r)
.await
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
match body.level {
Level::Error => tracing::error!(?body.message),
Level::Warn => tracing::warn!(?body.message),
Level::Info => tracing::info!(?body.message),
Level::Debug => tracing::debug!(?body.message),
Level::Trace => tracing::trace!(?body.message),
}
json_response(StatusCode::OK, ())
}
pub fn make_router(
conf: &'static PageServerConf,
launch_ts: &'static LaunchTimestamp,
@@ -1372,9 +1341,5 @@ pub fn make_router(
testing_api!("set tenant state to broken", handle_tenant_break),
)
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
.post(
"/v1/tracing/event",
testing_api!("emit a tracing event", post_tracing_event_handler),
)
.any(handler_404))
}

View File

@@ -1,9 +1,9 @@
use metrics::core::{AtomicU64, GenericCounter};
use metrics::{
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec, Counter, CounterVec,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
UIntGaugeVec,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
Counter, CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::models::TenantState;
@@ -350,6 +350,11 @@ pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub static NUM_ONDISK_LAYERS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
.expect("failed to define a metric")
});
// remote storage metrics
/// NB: increment _after_ recording the current value into [`REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST`].
@@ -380,26 +385,6 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_timeline_client_bytes_started",
"Incremented by the number of bytes associated with a remote timeline client operation. \
The increment happens when the operation is scheduled.",
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
)
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_timeline_client_bytes_finished",
"Incremented by the number of bytes associated with a remote timeline client operation. \
The increment happens when the operation finishes (regardless of success/failure/shutdown).",
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
)
.expect("failed to define a metric")
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,
@@ -754,8 +739,6 @@ pub struct RemoteTimelineClientMetrics {
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
}
impl RemoteTimelineClientMetrics {
@@ -766,8 +749,6 @@ impl RemoteTimelineClientMetrics {
remote_operation_time: Mutex::new(HashMap::default()),
calls_unfinished_gauge: Mutex::new(HashMap::default()),
calls_started_hist: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge: Mutex::new(None),
}
}
@@ -806,7 +787,6 @@ impl RemoteTimelineClientMetrics {
});
metric.clone()
}
fn calls_unfinished_gauge(
&self,
file_kind: &RemoteOpFileKind,
@@ -848,125 +828,32 @@ impl RemoteTimelineClientMetrics {
});
metric.clone()
}
fn bytes_started_counter(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntCounter {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.bytes_started_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
])
.unwrap()
});
metric.clone()
}
fn bytes_finished_counter(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntCounter {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.bytes_finished_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
])
.unwrap()
});
metric.clone()
}
}
#[cfg(test)]
impl RemoteTimelineClientMetrics {
pub fn get_bytes_started_counter_value(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Option<u64> {
let guard = self.bytes_started_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
guard.get(&key).map(|counter| counter.get())
}
pub fn get_bytes_finished_counter_value(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Option<u64> {
let guard = self.bytes_finished_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
guard.get(&key).map(|counter| counter.get())
}
}
/// See [`RemoteTimelineClientMetrics::call_begin`].
#[must_use]
pub(crate) struct RemoteTimelineClientCallMetricGuard {
/// Decremented on drop.
calls_unfinished_metric: Option<IntGauge>,
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
bytes_finished: Option<(IntCounter, u64)>,
}
pub(crate) struct RemoteTimelineClientCallMetricGuard(Option<IntGauge>);
impl RemoteTimelineClientCallMetricGuard {
/// Consume this guard object without performing the metric updates it would do on `drop()`.
/// The caller vouches to do the metric updates manually.
/// Consume this guard object without decrementing the metric.
/// The caller vouches to do this manually, so that the prior increment of the gauge will cancel out.
pub fn will_decrement_manually(mut self) {
let RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric,
bytes_finished,
} = &mut self;
calls_unfinished_metric.take();
bytes_finished.take();
self.0 = None; // prevent drop() from decrementing
}
}
impl Drop for RemoteTimelineClientCallMetricGuard {
fn drop(&mut self) {
let RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric,
bytes_finished,
} = self;
if let Some(guard) = calls_unfinished_metric.take() {
if let RemoteTimelineClientCallMetricGuard(Some(guard)) = self {
guard.dec();
}
if let Some((bytes_finished_metric, value)) = bytes_finished {
bytes_finished_metric.inc_by(*value);
}
}
}
/// The enum variants communicate to the [`RemoteTimelineClientMetrics`] whether to
/// track the byte size of this call in applicable metric(s).
pub(crate) enum RemoteTimelineClientMetricsCallTrackSize {
/// Do not account for this call's byte size in any metrics.
/// The `reason` field is there to make the call sites self-documenting
/// about why they don't need the metric.
DontTrackSize { reason: &'static str },
/// Track the byte size of the call in applicable metric(s).
Bytes(u64),
}
impl RemoteTimelineClientMetrics {
/// Update the metrics that change when a call to the remote timeline client instance starts.
/// Increment the metrics that track ongoing calls to the remote timeline client instance.
///
/// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions.
/// Drop the returned guard object once the operation is finished to decrement the values.
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that
/// is more suitable.
/// Never do both.
@@ -974,51 +861,24 @@ impl RemoteTimelineClientMetrics {
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
size: RemoteTimelineClientMetricsCallTrackSize,
) -> RemoteTimelineClientCallMetricGuard {
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
self.calls_started_hist(file_kind, op_kind)
.observe(calls_unfinished_metric.get() as f64);
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
let bytes_finished = match size {
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
// nothing to do
None
}
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
self.bytes_started_counter(file_kind, op_kind).inc_by(size);
let finished_counter = self.bytes_finished_counter(file_kind, op_kind);
Some((finished_counter, size))
}
};
RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric: Some(calls_unfinished_metric),
bytes_finished,
}
.observe(unfinished_metric.get() as f64);
unfinished_metric.inc();
RemoteTimelineClientCallMetricGuard(Some(unfinished_metric))
}
/// Manually udpate the metrics that track completions, instead of using the guard object.
/// Manually decrement the metric instead of using the guard object.
/// Using the guard object is generally preferable.
/// See [`call_begin`] for more context.
pub(crate) fn call_end(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
size: RemoteTimelineClientMetricsCallTrackSize,
) {
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
pub(crate) fn call_end(&self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind) {
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
debug_assert!(
calls_unfinished_metric.get() > 0,
unfinished_metric.get() > 0,
"begin and end should cancel out"
);
calls_unfinished_metric.dec();
match size {
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
self.bytes_finished_counter(file_kind, op_kind).inc_by(size);
}
}
unfinished_metric.dec();
}
}
@@ -1031,8 +891,6 @@ impl Drop for RemoteTimelineClientMetrics {
remote_operation_time,
calls_unfinished_gauge,
calls_started_hist,
bytes_started_counter,
bytes_finished_counter,
} = self;
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
@@ -1053,22 +911,6 @@ impl Drop for RemoteTimelineClientMetrics {
b,
]);
}
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
tenant_id,
timeline_id,
a,
b,
]);
}
for ((a, b), _) in bytes_finished_counter.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER.remove_label_values(&[
tenant_id,
timeline_id,
a,
b,
]);
}
{
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);

View File

@@ -700,8 +700,6 @@ impl PageServerHandler {
full_backup: bool,
ctx: RequestContext,
) -> anyhow::Result<()> {
let started = std::time::Instant::now();
// check that the timeline exists
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
@@ -714,8 +712,6 @@ impl PageServerHandler {
.context("invalid basebackup lsn")?;
}
let lsn_awaited_after = started.elapsed();
// switch client to COPYOUT
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
pgb.flush().await?;
@@ -736,17 +732,7 @@ impl PageServerHandler {
pgb.write_message_noflush(&BeMessage::CopyDone)?;
pgb.flush().await?;
let basebackup_after = started
.elapsed()
.checked_sub(lsn_awaited_after)
.unwrap_or(Duration::ZERO);
info!(
lsn_await_millis = lsn_awaited_after.as_millis(),
basebackup_millis = basebackup_after.as_millis(),
"basebackup complete"
);
info!("basebackup complete");
Ok(())
}

View File

@@ -118,10 +118,6 @@ pub struct Tenant {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
/// The value creation timestamp, used to measure activation delay, see:
/// <https://github.com/neondatabase/neon/issues/4025>
loading_started_at: Instant,
state: watch::Sender<TenantState>,
// Overridden tenant-specific config parameters.
@@ -1480,7 +1476,7 @@ impl Tenant {
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
debug!(tenant_id = %self.tenant_id, "Activating tenant");
info!("Activating tenant {}", self.tenant_id);
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
@@ -1491,17 +1487,12 @@ impl Tenant {
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self.tenant_id);
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
for timeline in not_broken_timelines {
match timeline
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {
activated_timelines += 1;
}
Ok(()) => {}
Err(e) => {
error!(
"Failed to activate timeline {}: {:#}",
@@ -1512,26 +1503,9 @@ impl Tenant {
"failed to activate timeline {}: {}",
timeline.timeline_id, e
));
timelines_broken_during_activation += 1;
}
}
}
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
timelines_broken_during_activation,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
}
}
});
@@ -1838,9 +1812,6 @@ impl Tenant {
Tenant {
tenant_id,
conf,
// using now here is good enough approximation to catch tenants with really long
// activation times.
loading_started_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),
gc_cs: tokio::sync::Mutex::new(()),

View File

@@ -48,6 +48,7 @@ mod layer_coverage;
use crate::context::RequestContext;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
@@ -287,6 +288,7 @@ where
self.l0_delta_layers.push(layer);
}
NUM_ONDISK_LAYERS.inc();
Ok(())
}
@@ -312,6 +314,8 @@ where
"failed to locate removed historic layer from l0_delta_layers"
);
}
NUM_ONDISK_LAYERS.dec();
}
pub(self) fn replace_historic_noflush(

View File

@@ -219,8 +219,7 @@ use utils::lsn::Lsn;
use crate::metrics::{
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::{
@@ -368,13 +367,9 @@ impl RemoteTimelineClient {
/// Download index file
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Index,
&RemoteOpKind::Download,
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
reason: "no need for a downloads gauge",
},
);
let _unfinished_gauge_guard = self
.metrics
.call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download);
download::download_index_part(
self.conf,
@@ -403,13 +398,9 @@ impl RemoteTimelineClient {
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<u64> {
let downloaded_size = {
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Layer,
&RemoteOpKind::Download,
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
reason: "no need for a downloads gauge",
},
);
let _unfinished_gauge_guard = self
.metrics
.call_begin(&RemoteOpFileKind::Layer, &RemoteOpKind::Download);
download::download_layer_file(
self.conf,
&self.storage_impl,
@@ -895,32 +886,11 @@ impl RemoteTimelineClient {
fn calls_unfinished_metric_impl(
&self,
op: &UploadOp,
) -> Option<(
RemoteOpFileKind,
RemoteOpKind,
RemoteTimelineClientMetricsCallTrackSize,
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
) -> Option<(RemoteOpFileKind, RemoteOpKind)> {
let res = match op {
UploadOp::UploadLayer(_, m) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
),
UploadOp::UploadMetadata(_, _) => (
RemoteOpFileKind::Index,
RemoteOpKind::Upload,
DontTrackSize {
reason: "metadata uploads are tiny",
},
),
UploadOp::Delete(file_kind, _) => (
*file_kind,
RemoteOpKind::Delete,
DontTrackSize {
reason: "should we track deletes? positive or negative sign?",
},
),
UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload),
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
UploadOp::Barrier(_) => {
// we do not account these
return None;
@@ -930,20 +900,20 @@ impl RemoteTimelineClient {
}
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
Some(x) => x,
None => return,
};
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
let guard = self.metrics.call_begin(&file_kind, &op_kind);
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
}
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
Some(x) => x,
None => return,
};
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
self.metrics.call_end(&file_kind, &op_kind);
}
fn stop(&self) {
@@ -1011,19 +981,11 @@ impl RemoteTimelineClient {
mod tests {
use super::*;
use crate::{
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
Tenant,
},
tenant::harness::{TenantHarness, TIMELINE_ID},
DEFAULT_PG_VERSION,
};
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
use std::{
collections::HashSet,
path::{Path, PathBuf},
};
use tokio::runtime::EnterGuard;
use std::{collections::HashSet, path::Path};
use utils::lsn::Lsn;
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
@@ -1072,80 +1034,39 @@ mod tests {
assert_eq!(found, expected);
}
struct TestSetup {
runtime: &'static tokio::runtime::Runtime,
entered_runtime: EnterGuard<'static>,
harness: TenantHarness<'static>,
tenant: Arc<Tenant>,
tenant_ctx: RequestContext,
remote_fs_dir: PathBuf,
client: Arc<RemoteTimelineClient>,
}
impl TestSetup {
fn new(test_name: &str) -> anyhow::Result<Self> {
// Use a current-thread runtime in the test
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let entered_runtime = runtime.enter();
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = timeline.initialize(&ctx).unwrap();
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let client = Arc::new(RemoteTimelineClient {
conf: harness.conf,
runtime,
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
storage_impl: storage,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&harness.tenant_id,
&TIMELINE_ID,
)),
});
Ok(Self {
runtime,
entered_runtime,
harness,
tenant,
tenant_ctx: ctx,
remote_fs_dir,
client,
})
}
}
// Test scheduling
#[test]
fn upload_scheduling() -> anyhow::Result<()> {
// Use a current-thread runtime in the test
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let _entered = runtime.enter();
let harness = TenantHarness::create("upload_scheduling")?;
let (tenant, ctx) = runtime.block_on(harness.load());
let _timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
// Test outline:
//
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
@@ -1160,20 +1081,22 @@ mod tests {
// Schedule another deletion. Check that it's launched immediately.
// Schedule index upload. Check that it's queued
let TestSetup {
runtime,
entered_runtime: _entered_runtime,
harness,
tenant: _tenant,
tenant_ctx: _tenant_ctx,
remote_fs_dir,
client,
} = TestSetup::new("upload_scheduling").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
println!("workdir: {}", harness.conf.workdir.display());
let storage_impl = GenericRemoteStorage::from_config(&storage_config)?;
let client = Arc::new(RemoteTimelineClient {
conf: harness.conf,
runtime,
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
storage_impl,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&harness.tenant_id,
&TIMELINE_ID,
)),
});
let remote_timeline_dir =
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
println!("remote_timeline_dir: {}", remote_timeline_dir.display());
@@ -1293,90 +1216,4 @@ mod tests {
Ok(())
}
#[test]
fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> {
// Setup
let TestSetup {
runtime,
harness,
client,
..
} = TestSetup::new("metrics")?;
let metadata = dummy_metadata(Lsn(0x10));
client.init_upload_queue_for_empty_remote(&metadata)?;
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let content_1 = dummy_contents("foo");
std::fs::write(
timeline_path.join(layer_file_name_1.file_name()),
&content_1,
)?;
#[derive(Debug, PartialEq)]
struct BytesStartedFinished {
started: Option<usize>,
finished: Option<usize>,
}
let get_bytes_started_stopped = || {
let started = client
.metrics
.get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
.map(|v| v.try_into().unwrap());
let stopped = client
.metrics
.get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
.map(|v| v.try_into().unwrap());
BytesStartedFinished {
started,
finished: stopped,
}
};
// Test
let init = get_bytes_started_stopped();
client.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64),
)?;
let pre = get_bytes_started_stopped();
runtime.block_on(client.wait_completion())?;
let post = get_bytes_started_stopped();
// Validate
assert_eq!(
init,
BytesStartedFinished {
started: None,
finished: None
}
);
assert_eq!(
pre,
BytesStartedFinished {
started: Some(content_1.len()),
// assert that the _finished metric is created eagerly so that subtractions work on first sample
finished: Some(0),
}
);
assert_eq!(
post,
BytesStartedFinished {
started: Some(content_1.len()),
finished: Some(content_1.len())
}
);
Ok(())
}
}

View File

@@ -33,10 +33,39 @@ CREATE VIEW local_cache AS
(pageoffs int8, relfilenode oid, reltablespace oid, reldatabase oid,
relforknumber int2, relblocknumber int8, accesscount int4);
create table neon_prepared_statements(
client_id text not null,
stmt_name text not null,
stmt_body text not null,
from_sql boolean not null,
primary key(client_id, stmt_name)
create table postgres_log (
log_time timestamp(3) with time zone,
user_name text,
database_name text,
process_id integer,
connection_from text,
session_id text,
session_line_num bigint,
command_tag text,
session_start_time timestamp with time zone,
virtual_transaction_id text,
transaction_id bigint,
error_severity text,
sql_state_code text,
message text,
detail text,
hint text,
internal_query text,
internal_query_pos integer,
context text,
query text,
query_pos integer,
location text,
application_name text,
backend_type text,
leader_pid integer,
query_id bigint
);
CREATE FUNCTION read_postgres_log()
RETURNS setof postgres_log
AS 'MODULE_PATHNAME', 'read_postgres_log'
LANGUAGE C PARALLEL SAFE;
CREATE VIEW pg_log AS
SELECT * from read_postgres_log();

View File

@@ -11,18 +11,22 @@
#include "postgres.h"
#include "fmgr.h"
#include <sys/stat.h>
#include "access/table.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/prepare.h"
#include "executor/spi.h"
#include "commands/copy.h"
#include "nodes/makefuncs.h"
#include "nodes/value.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "catalog/pg_type.h"
#include "replication/walsender.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/guc.h"
#include "neon.h"
@@ -31,37 +35,19 @@
PG_MODULE_MAGIC;
void _PG_init(void);
static char* neon_load_prepared_statement(char const* stmt_name, bool* from_sql);
static void neon_save_prepared_statement(char const* stmt_name, char const* stmt_body, bool from_sql);
static bool neon_drop_prepared_statement(char const* stmt_name);
static bool save_parepared_statememts;
void
_PG_init(void)
{
pg_init_libpagestore();
pg_init_walproposer();
DefineCustomBoolVariable("neon.save_prepared_statements",
"Support prepared statements in case of using connetion pooler",
NULL,
&save_parepared_statememts,
false, /* disabled by default */
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
save_prepared_statement_hook = neon_save_prepared_statement;
load_prepared_statement_hook = neon_load_prepared_statement;
drop_prepared_statement_hook = neon_drop_prepared_statement;
EmitWarningsOnPlaceholders("neon");
}
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
PG_FUNCTION_INFO_V1(read_postgres_log);
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
@@ -108,71 +94,128 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
PG_RETURN_UINT64(BackpressureThrottlingTime());
}
static char*
neon_load_prepared_statement(char const* stmt_name, bool* from_sql)
{
char* stmt_body = NULL;
if (save_parepared_statememts)
{
int rc;
Oid param_types[2] = {TEXTOID, TEXTOID};
Datum param_values[2] = {CStringGetTextDatum(application_name), CStringGetTextDatum(stmt_name)};
bool is_null;
MemoryContext call_ctx = CurrentMemoryContext;
SPI_connect();
rc = SPI_execute_with_args("select stmt_body,from_sql from neon_prepared_statements where client_id=$1 and stmt_name=$2",
2, param_types, param_values, NULL, true, 1);
if (rc != SPI_OK_SELECT || SPI_processed != 1) {
SPI_finish();
elog(LOG, "Prepared statement %s not found for client %s", stmt_name, application_name);
return NULL;
#define PG_LOG_DIR "log"
#define POSTGRES_LOG "postgres_log"
#define LOG_TABLE_N_COLUMS 26
typedef struct {
char* path;
time_t ctime;
} LogFile;
typedef struct
{
Relation log_table;
List* log_files;
CopyFromState copy_state;
ListCell* curr_log;
} LogfileContext;
static int cmp_log_ctime(const ListCell *a, const ListCell *b)
{
LogFile* la = (LogFile*)lfirst(a);
LogFile* lb = (LogFile*)lfirst(b);
return la->ctime < lb->ctime ? -1 : la->ctime == lb->ctime ? 0 : 1;
}
Datum
read_postgres_log(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
LogfileContext *fctx; /* User function context. */
List* log_files = NULL;
if (SRF_IS_FIRSTCALL())
{
struct dirent *dent;
DIR* dir;
struct stat statbuf;
char* path;
funcctx = SRF_FIRSTCALL_INIT();
/* Switch context when allocating stuff to be used in later calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Create a user function context for cross-call persistence */
fctx = (LogfileContext *) palloc(sizeof(LogfileContext));
fctx->log_files = NULL;
fctx->copy_state = NULL;
if ((dir = AllocateDir(PG_LOG_DIR)) != NULL)
{
while ((dent = ReadDirExtended(dir, PG_LOG_DIR, LOG)) != NULL)
{
/* Ignore non-csv files */
if (strcmp(dent->d_name + strlen(dent->d_name) - 4, ".csv") != 0)
continue;
path = psprintf("%s/%s", PG_LOG_DIR, dent->d_name);
if (stat(path, &statbuf) == 0)
{
LogFile* log = (LogFile*)palloc(sizeof(LogFile));
log->ctime = statbuf.st_ctime;
log->path = path;
fctx->log_files = lappend(fctx->log_files, log);
}
else if (errno != ENOENT) /* file can be concurrently removed */
{
elog(LOG, "Failed to access log file %s", path);
pfree(path);
}
}
FreeDir(dir);
}
stmt_body = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
stmt_body = MemoryContextStrdup(call_ctx, stmt_body);
*from_sql = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2, &is_null));
SPI_finish();
list_sort(fctx->log_files, cmp_log_ctime);
fctx->log_table = table_openrv(makeRangeVar(NULL, POSTGRES_LOG, -1), AccessShareLock);
fctx->curr_log = list_head(fctx->log_files);
/* Remember the user function context. */
funcctx->user_fctx = fctx;
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
}
return stmt_body;
}
static void
neon_save_prepared_statement(char const* stmt_name, char const* stmt_body, bool from_sql)
{
if (save_parepared_statememts)
funcctx = SRF_PERCALL_SETUP();
/* Get the saved state */
fctx = funcctx->user_fctx;
while (fctx->curr_log != NULL)
{
int rc;
Oid param_types[4] = {TEXTOID, TEXTOID, TEXTOID, BOOLOID};
Datum param_values[4] = {CStringGetTextDatum(application_name), CStringGetTextDatum(stmt_name), CStringGetTextDatum(stmt_body), BoolGetDatum(from_sql)};
SPI_connect();
rc = SPI_execute_with_args("insert into neon_prepared_statements values($1,$2,$3,$4) on conflict (client_id,stmt_name) do update set stmt_body=EXCLUDED.stmt_body, from_sql=EXCLUDED.from_sql",
4, param_types, param_values, NULL, false, 1);
if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE)
elog(LOG, "Failed to persist prepared statement %s for client %s", stmt_name, application_name);
SPI_finish();
}
}
static bool
neon_drop_prepared_statement(char const* stmt_name)
{
if (save_parepared_statememts)
{
int rc;
Oid param_types[2] = {TEXTOID, TEXTOID};
Datum param_values[2] = {CStringGetTextDatum(application_name), CStringGetTextDatum(stmt_name)};
SPI_connect();
rc = SPI_execute_with_args("delete from neon_prepared_statements where client_id=$1 and stmt_name=$2",
2, param_types, param_values, NULL, false, 1);
if (rc != SPI_OK_DELETE || SPI_processed != 1) {
SPI_finish();
elog(LOG, "Prepared statement %s not found for client %s", stmt_name, application_name);
return false;
if (fctx->copy_state == NULL)
{
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
fctx->copy_state = BeginCopyFrom(NULL,
fctx->log_table,
NULL,
((LogFile*)lfirst(fctx->curr_log))->path,
false,
NULL,
NIL,
list_make1(makeDefElem("format", (Node *) makeString("csv"), -1)));
MemoryContextSwitchTo(oldcontext);
}
SPI_finish();
return true;
if (fctx->copy_state != NULL)
{
Datum values[LOG_TABLE_N_COLUMS];
bool nulls[LOG_TABLE_N_COLUMS];
if (NextCopyFrom(fctx->copy_state, NULL,
values, nulls))
{
HeapTuple tuple = heap_form_tuple(RelationGetDescr(fctx->log_table), values, nulls);
Datum result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
EndCopyFrom(fctx->copy_state);
fctx->copy_state = NULL;
}
fctx->curr_log = lnext(fctx->log_files, fctx->curr_log);
}
return false;
table_close(fctx->log_table, AccessShareLock);
SRF_RETURN_DONE(funcctx);
}

View File

@@ -45,8 +45,6 @@ PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
"pageserver_remote_physical_size",
"pageserver_remote_timeline_client_bytes_started_total",
"pageserver_remote_timeline_client_bytes_finished_total",
)
PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
@@ -55,7 +53,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_storage_operations_seconds_global_bucket",
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (

View File

@@ -2928,18 +2928,32 @@ def fork_at_current_lsn(
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
def last_flush_lsn_upload(
env: NeonEnv, endpoint: Endpoint, tenant_id: TenantId, timeline_id: TimelineId
) -> Lsn:
"""
Wait for pageserver to catch to the latest flush LSN of given endpoint,
checkpoint pageserver, and wait for it to be uploaded (remote_consistent_lsn
reaching flush LSN).
"""
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
ps_http = env.pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn)
def wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: List[Safekeeper],
pageserver: NeonPageserver,
):
sk_commit_lsns = [
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn for sk in safekeepers
]
lsn = max(sk_commit_lsns)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, lsn)
return lsn
def wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: List[Safekeeper],
pageserver: NeonPageserver,
):
lsn = wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
tenant_id, timeline_id, safekeepers, pageserver
)
ps_http = pageserver.http_client()
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
return last_flush_lsn
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
return lsn

View File

@@ -550,13 +550,3 @@ class PageserverHttpClient(requests.Session):
def tenant_break(self, tenant_id: TenantId):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)
def post_tracing_event(self, level: str, message: str):
res = self.post(
f"http://localhost:{self.port}/v1/tracing/event",
json={
"level": level,
"message": message,
},
)
self.verbose_error(res)

View File

@@ -54,9 +54,10 @@ def wait_for_upload(
if current_lsn >= lsn:
log.info("wait finished")
return
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
log.info(
f"waiting for remote_consistent_lsn to reach {lsn}, now {current_lsn}, last_record_lsn={lr_lsn}, iteration {i + 1}"
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(

View File

@@ -6,6 +6,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
wait_for_last_flush_lsn,
wait_for_sk_commit_lsn_to_reach_remote_storage,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
@@ -198,7 +199,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
# with image_creation_threshold=1 which we will use on the last compaction
cur.execute("vacuum")
last_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
if i == 1 and j == 2 and k == 1:
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
@@ -221,8 +222,10 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
tenant_update_config({"image_creation_threshold": "1"})
ps_http.timeline_compact(tenant_id, timeline_id)
# wait for all uploads to finish (checkpoint has been done above)
wait_for_upload(ps_http, tenant_id, timeline_id, last_lsn)
# wait for all uploads to finish
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
# shutdown safekeepers to avoid on-demand downloads from walreceiver
for sk in env.safekeepers:

View File

@@ -1,49 +0,0 @@
import uuid
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import wait_until
@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "error"])
def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str):
# self-test: make sure the event is logged (i.e., our testing endpoint works)
log_expected = {
"trace": False,
"debug": False,
"info": True,
"warn": True,
"error": True,
}[level]
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
msg_id = uuid.uuid4().hex
# NB: the _total suffix is added by our prometheus client
before = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
# post the event
ps_http.post_tracing_event(level, msg_id)
if log_expected:
env.pageserver.allowed_errors.append(f".*{msg_id}.*")
def assert_logged():
if not log_expected:
return
assert env.pageserver.log_contains(f".*{msg_id}.*")
wait_until(10, 0.5, assert_logged)
# make sure it's counted
def assert_metric_value():
if not log_expected:
return
# NB: the _total suffix is added by our prometheus client
val = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
val = val or 0.0
log.info("libmetrics_tracing_event_count: %s", val)
assert val > (before or 0.0)
wait_until(10, 1, assert_metric_value)

View File

@@ -12,8 +12,8 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
wait_for_sk_commit_lsn_to_reach_remote_storage,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
@@ -207,7 +207,9 @@ def test_ondemand_download_timetravel(
env.endpoints.stop_all()
# wait until pageserver has successfully uploaded all the data to remote storage
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
def get_api_current_physical_size():
d = client.timeline_detail(tenant_id, timeline_id)
@@ -345,9 +347,12 @@ def test_download_remote_layers_api(
"""
)
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
env.endpoints.stop_all()
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
def get_api_current_physical_size():
d = client.timeline_detail(tenant_id, timeline_id)
return d["current_physical_size"]

View File

@@ -1,4 +1,3 @@
import json
from contextlib import closing
import psycopg2.extras
@@ -23,12 +22,9 @@ wait_lsn_timeout='111 s';
checkpoint_distance = 10000
compaction_target_size = 1048576
evictions_low_residence_duration_metric_threshold = "2 days"
eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = "23 hours" }
"""
env = neon_env_builder.init_start()
# we configure eviction but no remote storage, there might be error lines
env.pageserver.allowed_errors.append(".* no remote storage configured, cannot evict layers .*")
http_client = env.pageserver.http_client()
# Check that we raise on misspelled configs
@@ -48,7 +44,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
"checkpoint_distance": "20000",
"gc_period": "30sec",
"evictions_low_residence_duration_metric_threshold": "42s",
"eviction_policy": json.dumps({"kind": "NoEviction"}),
}
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
@@ -89,11 +84,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert effective_config["image_creation_threshold"] == 3
assert effective_config["pitr_interval"] == "7days"
assert effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
assert effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
}
# check the configuration of the new tenant
with closing(env.pageserver.connect()) as psconn:
@@ -131,9 +121,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert (
new_effective_config["evictions_low_residence_duration_metric_threshold"] == "42s"
), "Should override default value"
assert new_effective_config["eviction_policy"] == {
"kind": "NoEviction"
}, "Specific 'eviction_policy' config should override the default value"
assert new_effective_config["compaction_target_size"] == 1048576
assert new_effective_config["compaction_period"] == "20s"
assert new_effective_config["compaction_threshold"] == 10
@@ -148,9 +135,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
"compaction_period": "80sec",
"image_creation_threshold": "2",
"evictions_low_residence_duration_metric_threshold": "23h",
"eviction_policy": json.dumps(
{"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"}
),
}
env.neon_cli.config_tenant(
tenant_id=tenant,
@@ -196,11 +180,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert (
updated_effective_config["evictions_low_residence_duration_metric_threshold"] == "23h"
), "Should override default value"
assert updated_effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "1m 20s",
"threshold": "1day 18h",
}, "Specific 'eviction_policy' config should override the default value"
assert updated_effective_config["compaction_target_size"] == 1048576
assert updated_effective_config["compaction_threshold"] == 10
assert updated_effective_config["gc_horizon"] == 67108864
@@ -260,11 +239,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert final_effective_config["gc_period"] == "1h"
assert final_effective_config["image_creation_threshold"] == 3
assert final_effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
assert final_effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
}
# restart the pageserver and ensure that the config is still correct
env.pageserver.stop()

View File

@@ -21,7 +21,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
last_flush_lsn_upload,
wait_for_sk_commit_lsn_to_reach_remote_storage,
)
from fixtures.pageserver.utils import (
assert_tenant_state,
@@ -174,9 +174,12 @@ def test_tenants_attached_after_download(
)
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
env.endpoints.stop_all()
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)