Compare commits

..

1 Commits

Author SHA1 Message Date
Bojan Serafimov
b7803f8df7 Test prefetch miss 2023-01-30 16:24:51 -05:00
51 changed files with 2656 additions and 3643 deletions

View File

@@ -1,4 +1,4 @@
name: Build and Test
name: Test and Deploy
on:
push:
@@ -54,7 +54,7 @@ jobs:
check-codestyle-python:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cloud:pinned
options: --init
steps:
@@ -813,6 +813,121 @@ jobs:
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
calculate-deploy-targets:
runs-on: [ self-hosted, gen3, small ]
if: |
github.ref_name == 'release' &&
github.event_name != 'workflow_dispatch'
outputs:
matrix-include: ${{ steps.set-matrix.outputs.include }}
steps:
- id: set-matrix
run: |
if [[ "$GITHUB_REF_NAME" == "release" ]]; then
PRODUCTION='{"env_name": "production", "proxy_job": "neon-proxy", "proxy_config": "production.proxy", "storage_broker_ns": "neon-storage-broker", "storage_broker_config": "production.neon-storage-broker", "kubeconfig_secret": "PRODUCTION_KUBECONFIG_DATA", "console_api_key_secret": "NEON_PRODUCTION_API_KEY"}'
echo "include=[$PRODUCTION]" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to 'release'"
exit 1
fi
deploy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
if: |
github.ref_name == 'release' &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include: ${{fromJSON(needs.calculate-deploy-targets.outputs.matrix-include)}}
environment:
name: prod-old
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Redeploy
run: |
export DOCKER_TAG=${{needs.tag.outputs.build-tag}}
cd "$(pwd)/.github/ansible"
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
./get_binaries.sh
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
RELEASE=true ./get_binaries.sh
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
fi
eval $(ssh-agent)
echo "${{ secrets.TELEPORT_SSH_KEY }}" | tr -d '\n'| base64 --decode >ssh-key
echo "${{ secrets.TELEPORT_SSH_CERT }}" | tr -d '\n'| base64 --decode >ssh-key-cert.pub
chmod 0600 ssh-key
ssh-add ssh-key
rm -f ssh-key ssh-key-cert.pub
ANSIBLE_CONFIG=./ansible.cfg ansible-galaxy collection install sivel.toiletwater
ANSIBLE_CONFIG=./ansible.cfg ansible-playbook deploy.yaml -i ${{ matrix.env_name }}.hosts.yaml -e CONSOLE_API_TOKEN=${{ secrets[matrix.console_api_key_secret] }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ansible/collections': Permission denied
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy-new:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
options: --user root --privileged
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, tag, regress-tests ]
if: |
(github.ref_name == 'main') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
target_region: [ eu-west-1, us-east-2 ]
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Redeploy
run: |
export DOCKER_TAG=${{needs.tag.outputs.build-tag}}
cd "$(pwd)/.github/ansible"
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
./get_binaries.sh
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
RELEASE=true ./get_binaries.sh
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
fi
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy-pr-test-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
@@ -849,37 +964,348 @@ jobs:
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
deploy-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, tag, regress-tests ]
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
if: |
(github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1 ]
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: false
submodules: true
fetch-depth: 0
- name: Trigger deploy workflow
env:
GH_TOKEN: ${{ github.token }}
- name: Redeploy
run: |
export DOCKER_TAG=${{needs.tag.outputs.build-tag}}
cd "$(pwd)/.github/ansible"
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow run deploy-dev.yml --ref main -f branch=${{ github.sha }} -f dockerTag=${{needs.tag.outputs.build-tag}}
./get_binaries.sh
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow run deploy-prod.yml --ref release -f branch=${{ github.sha }} -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
RELEASE=true ./get_binaries.sh
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
fi
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
deploy-proxy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
if: |
github.ref_name == 'release' &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include: ${{fromJSON(needs.calculate-deploy-targets.outputs.matrix-include)}}
environment:
name: prod-old
env:
KUBECONFIG: .kubeconfig
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Store kubeconfig file
run: |
echo "${{ secrets[matrix.kubeconfig_secret] }}" | base64 --decode > ${KUBECONFIG}
chmod 0600 ${KUBECONFIG}
- name: Add neon helm chart
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install --atomic -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker:
name: deploy storage broker on old staging and old prod
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
if: |
github.ref_name == 'release' &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include: ${{fromJSON(needs.calculate-deploy-targets.outputs.matrix-include)}}
environment:
name: prod-old
env:
KUBECONFIG: .kubeconfig
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Store kubeconfig file
run: |
echo "${{ secrets[matrix.kubeconfig_secret] }}" | base64 --decode > ${KUBECONFIG}
chmod 0600 ${KUBECONFIG}
- name: Add neon helm chart
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker neondatabase/neon-storage-broker --namespace ${{ matrix.storage_broker_ns }} --create-namespace --install --atomic -f .github/helm-values/${{ matrix.storage_broker_config }}.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-proxy-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, tag, regress-tests ]
if: |
(github.ref_name == 'main') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: dev-us-east-2-beta
deploy_link_proxy: true
deploy_legacy_scram_proxy: true
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy scram proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy link proxy
if: matrix.deploy_link_proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy legacy scram proxy
if: matrix.deploy_legacy_scram_proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker-dev-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, tag, regress-tests ]
if: |
(github.ref_name == 'main') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: dev-us-east-2-beta
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-proxy-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, tag, regress-tests ]
if: |
(github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: prod-us-east-2-delta
deploy_link_proxy: true
deploy_legacy_scram_proxy: false
- target_region: us-west-2
target_cluster: prod-us-west-2-eta
deploy_link_proxy: false
deploy_legacy_scram_proxy: true
- target_region: eu-central-1
target_cluster: prod-eu-central-1-gamma
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy scram proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy link proxy
if: matrix.deploy_link_proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy legacy scram proxy
if: matrix.deploy_legacy_scram_proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
deploy-storage-broker-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, tag, regress-tests ]
if: |
(github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: prod-us-east-2-delta
- target_region: us-west-2
target_cluster: prod-us-west-2-eta
- target_region: eu-central-1
target_cluster: prod-eu-central-1-gamma
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
promote-compatibility-data:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ push-docker-hub, tag, regress-tests ]
needs: [ deploy, deploy-proxy ]
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
steps:
- name: Promote compatibility snapshot for the release

View File

@@ -1,179 +0,0 @@
name: Neon Deploy dev
on:
workflow_dispatch:
inputs:
dockerTag:
description: 'Docker tag to deploy'
required: true
type: string
branch:
description: 'Branch or commit used for deploy scripts and configs'
required: true
type: string
default: 'main'
deployStorage:
description: 'Deploy storage'
required: true
type: boolean
default: true
deployProxy:
description: 'Deploy proxy'
required: true
type: boolean
default: true
deployStorageBroker:
description: 'Deploy storage-broker'
required: true
type: boolean
default: true
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
concurrency:
group: deploy-dev
cancel-in-progress: false
jobs:
deploy-storage-new:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
options: --user root --privileged
if: inputs.deployStorage
defaults:
run:
shell: bash
strategy:
matrix:
target_region: [ eu-west-1, us-east-2 ]
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Redeploy
run: |
export DOCKER_TAG=${{ inputs.dockerTag }}
cd "$(pwd)/.github/ansible"
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy-proxy-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployProxy
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: dev-us-east-2-beta
deploy_link_proxy: true
deploy_legacy_scram_proxy: true
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy scram proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy link proxy
if: matrix.deploy_link_proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy legacy scram proxy
if: matrix.deploy_legacy_scram_proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker-new:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployStorageBroker
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: dev-us-east-2-beta
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
environment:
name: dev-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1-node16
with:
role-to-assume: arn:aws:iam::369495373322:role/github-runner
aws-region: eu-central-1
role-skip-session-tagging: true
role-duration-seconds: 1800
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache

View File

@@ -1,275 +0,0 @@
name: Neon Deploy prod
on:
workflow_dispatch:
inputs:
dockerTag:
description: 'Docker tag to deploy'
required: true
type: string
branch:
description: 'Branch or commit used for deploy scripts and configs'
required: true
type: string
default: 'release'
deployStorage:
description: 'Deploy storage'
required: true
type: boolean
default: true
deployProxy:
description: 'Deploy proxy'
required: true
type: boolean
default: true
deployStorageBroker:
description: 'Deploy storage-broker'
required: true
type: boolean
default: true
disclamerAcknowledged:
description: 'I confirm that there is an emergency and I can not use regular release workflow'
required: true
type: boolean
default: false
concurrency:
group: deploy-prod
cancel-in-progress: false
jobs:
deploy-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
if: inputs.deployStorage && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
strategy:
matrix:
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1 ]
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Redeploy
run: |
export DOCKER_TAG=${{ inputs.dockerTag }}
cd "$(pwd)/.github/ansible"
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
deploy-proxy-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
if: inputs.deployProxy && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: prod-us-east-2-delta
deploy_link_proxy: true
deploy_legacy_scram_proxy: false
- target_region: us-west-2
target_cluster: prod-us-west-2-eta
deploy_link_proxy: false
deploy_legacy_scram_proxy: true
- target_region: eu-central-1
target_cluster: prod-eu-central-1-gamma
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Re-deploy scram proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy link proxy
if: matrix.deploy_link_proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Re-deploy legacy scram proxy
if: matrix.deploy_legacy_scram_proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
deploy-storage-broker-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
if: inputs.deployStorageBroker && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
strategy:
matrix:
include:
- target_region: us-east-2
target_cluster: prod-us-east-2-delta
- target_region: us-west-2
target_cluster: prod-us-west-2-eta
- target_region: eu-central-1
target_cluster: prod-eu-central-1-gamma
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
environment:
name: prod-${{ matrix.target_region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
# Deploy to old account below
deploy:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
if: inputs.deployStorage && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
environment:
name: prod-old
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Redeploy
run: |
export DOCKER_TAG=${{ inputs.dockerTag }}
cd "$(pwd)/.github/ansible"
./get_binaries.sh
eval $(ssh-agent)
echo "${{ secrets.TELEPORT_SSH_KEY }}" | tr -d '\n'| base64 --decode >ssh-key
echo "${{ secrets.TELEPORT_SSH_CERT }}" | tr -d '\n'| base64 --decode >ssh-key-cert.pub
chmod 0600 ssh-key
ssh-add ssh-key
rm -f ssh-key ssh-key-cert.pub
ANSIBLE_CONFIG=./ansible.cfg ansible-galaxy collection install sivel.toiletwater
ANSIBLE_CONFIG=./ansible.cfg ansible-playbook deploy.yaml -i production.hosts.yaml -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ansible/collections': Permission denied
- name: Cleanup ansible folder
run: rm -rf ~/.ansible
deploy-proxy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployProxy && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
environment:
name: prod-old
env:
KUBECONFIG: .kubeconfig
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Store kubeconfig file
run: |
echo "${{ secrets.PRODUCTION_KUBECONFIG_DATA }}" | base64 --decode > ${KUBECONFIG}
chmod 0600 ${KUBECONFIG}
- name: Add neon helm chart
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{ inputs.dockerTag }}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --install --atomic -f .github/helm-values/production.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache
deploy-storage-broker:
name: deploy storage broker on old staging and old prod
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
if: inputs.deployStorageBroker && inputs.disclamerAcknowledged
defaults:
run:
shell: bash
environment:
name: prod-old
env:
KUBECONFIG: .kubeconfig
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
ref: ${{ inputs.branch }}
- name: Store kubeconfig file
run: |
echo "${{ secrets.PRODUCTION_KUBECONFIG_DATA }}" | base64 --decode > ${KUBECONFIG}
chmod 0600 ${KUBECONFIG}
- name: Add neon helm chart
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker neondatabase/neon-storage-broker --namespace neon-storage-broker --create-namespace --install --atomic -f .github/helm-values/production.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
- name: Cleanup helm folder
run: rm -rf ~/.cache

View File

@@ -16,18 +16,18 @@ jobs:
- name: Get current date
id: date
run: echo "date=$(date +'%Y-%m-%d')" >> $GITHUB_OUTPUT
run: echo "date=(date +'%Y-%m-%d')" >> $GITHUB_OUTPUT
- name: Create release branch
run: git checkout -b releases/${{ steps.date.outputs.date }}
run: git checkout -b release/${{ steps.date.outputs.date }}
- name: Push new branch
run: git push origin releases/${{ steps.date.outputs.date }}
run: git push origin release/${{ steps.date.outputs.date }}
- name: Create pull request into release
uses: thomaseizinger/create-pull-request@e3972219c86a56550fb70708d96800d8e24ba862 # 1.3.0
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
head: releases/${{ steps.date.outputs.date }}
head: release/${{ steps.date.outputs.date }}
base: release
title: Release ${{ steps.date.outputs.date }}

28
Cargo.lock generated
View File

@@ -17,17 +17,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]]
name = "ahash"
version = "0.8.2"
@@ -1526,9 +1515,6 @@ name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash 0.7.6",
]
[[package]]
name = "hashbrown"
@@ -1536,16 +1522,7 @@ version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
dependencies = [
"ahash 0.8.2",
]
[[package]]
name = "hashlink"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa"
dependencies = [
"hashbrown 0.12.3",
"ahash",
]
[[package]]
@@ -2787,7 +2764,6 @@ dependencies = [
"futures",
"git-version",
"hashbrown 0.13.2",
"hashlink",
"hex",
"hmac",
"hostname",
@@ -4361,7 +4337,6 @@ dependencies = [
"tokio-rustls",
"tracing",
"tracing-subscriber",
"url",
"workspace_hack",
]
@@ -4648,7 +4623,6 @@ dependencies = [
"futures-executor",
"futures-task",
"futures-util",
"hashbrown 0.12.3",
"indexmap",
"itertools",
"libc",

View File

@@ -44,7 +44,6 @@ futures-core = "0.3"
futures-util = "0.3"
git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hex = "0.4"
hex-literal = "0.3"
hmac = "0.12.1"

View File

@@ -44,30 +44,20 @@ RUN cd postgres && \
FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y cmake gdal-bin libboost-dev libboost-thread-dev libboost-filesystem-dev \
libboost-system-dev libboost-iostreams-dev libboost-program-options-dev libboost-timer-dev \
libcgal-dev libgdal-dev libgmp-dev libmpfr-dev libopenscenegraph-dev libprotobuf-c-dev \
protobuf-c-compiler xsltproc
RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz && \
tar zxvf SFCGAL-v1.3.10.tar.gz && \
cd SFCGAL-v1.3.10 && cmake . && make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
make clean && cp -R /sfcgal/* /
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
tar xvzf postgis-3.3.1.tar.gz && \
cd postgis-3.3.1 && \
./autogen.sh && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
./configure && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
cd extensions/postgis && \
make clean && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_raster.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_sfcgal.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_tiger_geocoder.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_topology.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer.control && \
@@ -172,7 +162,6 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz &
#########################################################################################
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=postgis-build /sfcgal/* /
COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
@@ -235,7 +224,7 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# Install:
# libreadline8 for psql
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libgeos, libgdal, libproj and libprotobuf-c1 for PostGIS
RUN apt update && \
apt install --no-install-recommends -y \
libreadline8 \
@@ -244,7 +233,6 @@ RUN apt update && \
libgdal28 \
libproj19 \
libprotobuf-c1 \
libsfcgal1 \
gdb && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

View File

@@ -108,7 +108,7 @@ make -j`sysctl -n hw.logicalcpu`
To run the `psql` client, install the `postgresql-client` package or modify `PATH` and `LD_LIBRARY_PATH` to include `pg_install/bin` and `pg_install/lib`, respectively.
To run the integration tests or Python scripts (not required to use the code), install
Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (requires [poetry>=1.3](https://python-poetry.org/)) in the project directory.
Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (requires [poetry](https://python-poetry.org/)) in the project directory.
#### Running neon database
@@ -254,6 +254,3 @@ To get more familiar with this aspect, refer to:
- Read `CONTRIBUTING.md` to learn about project code style and practices.
- To get familiar with a source tree layout, use [/docs/sourcetree.md](/docs/sourcetree.md).
- To learn more about PostgreSQL internals, check http://www.interdb.jp/pg/index.html
dd

View File

@@ -37,7 +37,6 @@ metrics.workspace = true
pq_proto.workspace = true
workspace_hack.workspace = true
url.workspace = true
[dev-dependencies]
byteorder.workspace = true

View File

@@ -1,5 +1,4 @@
use core::fmt;
use std::{borrow::Cow, str::FromStr};
use std::str::FromStr;
use super::error::ApiError;
use anyhow::anyhow;
@@ -30,50 +29,6 @@ pub fn parse_request_param<T: FromStr>(
}
}
fn get_query_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<Option<Cow<'a, str>>, ApiError> {
let query = match request.uri().query() {
Some(q) => q,
None => return Ok(None),
};
let mut values = url::form_urlencoded::parse(query.as_bytes())
.filter_map(|(k, v)| if k == param_name { Some(v) } else { None })
// we call .next() twice below. If it's None the first time, .fuse() ensures it's None afterwards
.fuse();
let value1 = values.next();
if values.next().is_some() {
return Err(ApiError::BadRequest(anyhow!(
"param {param_name} specified more than once"
)));
}
Ok(value1)
}
pub fn must_get_query_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<Cow<'a, str>, ApiError> {
get_query_param(request, param_name)?.ok_or_else(|| {
ApiError::BadRequest(anyhow!("no {param_name} specified in query parameters"))
})
}
pub fn parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
request: &Request<Body>,
param_name: &str,
) -> Result<Option<T>, ApiError> {
get_query_param(request, param_name)?
.map(|v| {
v.parse().map_err(|e| {
ApiError::BadRequest(anyhow!("cannot parse query param {param_name}: {e}"))
})
})
.transpose()
}
pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError> {
match request.body_mut().data().await {
Some(_) => Err(ApiError::BadRequest(anyhow!("Unexpected request body"))),

View File

@@ -1,230 +0,0 @@
//! Tool for extracting content-dependent metadata about layers. Useful for scanning real project layer files and evaluating the effectiveness of different heuristics on them.
//!
//! Currently it only analyzes holes, which are regions within the layer range that the layer contains no updates for. In the future it might do more analysis (maybe key quantiles?) but it should never return sensitive data.
use anyhow::Result;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::Range;
use std::{env, fs, path::Path, path::PathBuf, str, str::FromStr};
use pageserver::page_cache::PAGE_SZ;
use pageserver::repository::{Key, KEY_SIZE};
use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
use pageserver::tenant::storage_layer::range_overlaps;
use pageserver::virtual_file::VirtualFile;
use utils::{bin_ser::BeSer, lsn::Lsn};
const MIN_HOLE_LENGTH: i128 = (128 * 1024 * 1024 / PAGE_SZ) as i128;
const DEFAULT_MAX_HOLES: usize = 10;
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(PartialEq, Eq)]
struct Hole(Range<Key>);
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
let other_len = other.0.end.to_i128() - other.0.start.to_i128();
let self_len = self.0.end.to_i128() - self.0.start.to_i128();
other_len.cmp(&self_len)
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
struct LayerFile {
key_range: Range<Key>,
lsn_range: Range<Lsn>,
is_delta: bool,
holes: Vec<Hole>,
}
impl LayerFile {
fn skips(&self, key_range: &Range<Key>) -> bool {
if !range_overlaps(&self.key_range, key_range) {
return false;
}
let start = match self
.holes
.binary_search_by_key(&key_range.start, |hole| hole.0.start)
{
Ok(index) => index,
Err(index) => {
if index == 0 {
return false;
}
index - 1
}
};
self.holes[start].0.end >= key_range.end
}
}
fn parse_filename(name: &str) -> Option<LayerFile> {
let split: Vec<&str> = name.split("__").collect();
if split.len() != 2 {
return None;
}
let keys: Vec<&str> = split[0].split('-').collect();
let mut lsns: Vec<&str> = split[1].split('-').collect();
let is_delta = if lsns.len() == 1 {
lsns.push(lsns[0]);
false
} else {
true
};
let key_range = Key::from_hex(keys[0]).unwrap()..Key::from_hex(keys[1]).unwrap();
let lsn_range = Lsn::from_hex(lsns[0]).unwrap()..Lsn::from_hex(lsns[1]).unwrap();
let holes = Vec::new();
Some(LayerFile {
key_range,
lsn_range,
is_delta,
holes,
})
}
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,
actual_summary.index_root_blk,
file,
);
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev_key: Option<Key> = None;
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|key, _value| {
let curr = Key::from_slice(&key[..KEY_SIZE]);
if let Some(prev) = prev_key {
if curr.to_i128() - prev.to_i128() >= MIN_HOLE_LENGTH {
heap.push(Hole(prev..curr));
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
}
}
prev_key = Some(curr.next());
true
},
)?;
let mut holes = heap.into_vec();
holes.sort_by_key(|hole| hole.0.start);
Ok(holes)
}
fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
println!("Usage: layer_map_analyzer PAGESERVER_DATA_DIR [MAX_HOLES]");
return Ok(());
}
let storage_path = PathBuf::from_str(&args[1])?;
let max_holes = if args.len() > 2 {
args[2].parse::<usize>().unwrap()
} else {
DEFAULT_MAX_HOLES
};
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;
let mut total_image_layers = 0usize;
let mut total_excess_layers = 0usize;
for tenant in fs::read_dir(storage_path.join("tenants"))? {
let tenant = tenant?;
if !tenant.file_type()?.is_dir() {
continue;
}
for timeline in fs::read_dir(tenant.path().join("timelines"))? {
let timeline = timeline?;
if !timeline.file_type()?.is_dir() {
continue;
}
// Collect sorted vec of layers and count deltas
let mut layers = Vec::new();
let mut n_deltas = 0usize;
for layer in fs::read_dir(timeline.path())? {
let layer = layer?;
if let Some(mut layer_file) =
parse_filename(&layer.file_name().into_string().unwrap())
{
if layer_file.is_delta {
layer_file.holes = get_holes(&layer.path(), max_holes)?;
n_deltas += 1;
}
layers.push(layer_file);
}
}
layers.sort_by_key(|layer| layer.lsn_range.end);
// Count the number of holes and number of excess layers.
// Excess layer is image layer generated when holes in delta layers are not considered.
let mut n_excess_layers = 0usize;
let mut n_holes = 0usize;
for i in 0..layers.len() {
if !layers[i].is_delta {
let mut n_deltas_since_last_image = 0usize;
let mut n_skipped = 0usize;
let img_key_range = &layers[i].key_range;
for j in (0..i).rev() {
if range_overlaps(img_key_range, &layers[j].key_range) {
if layers[j].is_delta {
n_deltas_since_last_image += 1;
if layers[j].skips(img_key_range) {
n_skipped += 1;
}
} else {
// Image layer is always dense, despite to the fact that it doesn't contain all possible
// key values in the specified range: there are may be no keys in the storage belonging
// to the image layer range but not present in the image layer.
break;
}
}
}
if n_deltas_since_last_image >= 3 && n_deltas_since_last_image - n_skipped < 3 {
// It is just approximation: it doesn't take in account all image coverage.
// Moreover the new layer map doesn't count total deltas, but the max stack of overlapping deltas.
n_excess_layers += 1;
}
n_holes += n_skipped;
}
}
println!(
"Tenant {} timeline {} delta layers {} image layers {} excess layers {} holes {}",
tenant.file_name().into_string().unwrap(),
timeline.file_name().into_string().unwrap(),
n_deltas,
layers.len() - n_deltas,
n_excess_layers,
n_holes
);
total_delta_layers += n_deltas;
total_image_layers += layers.len() - n_deltas;
total_excess_layers += n_excess_layers;
}
}
println!(
"Total delta layers {} image layers {} excess layers {}",
total_delta_layers, total_image_layers, total_excess_layers
);
Ok(())
}

View File

@@ -12,9 +12,7 @@ use anyhow::Context;
use clap::{value_parser, Arg, Command};
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
@@ -77,8 +75,7 @@ fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx)
dump_layerfile_from_path(path, true)
}
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {

View File

@@ -7,7 +7,6 @@ use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::request::{must_get_query_param, parse_query_param};
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
@@ -236,8 +235,8 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
let include_non_incremental_logical_size =
query_param_present(&request, "include-non-incremental-logical-size");
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
@@ -250,14 +249,13 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let mut response_data = Vec::with_capacity(timelines.len());
for timeline in timelines {
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.await
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.map_err(ApiError::InternalServerError)?;
let timeline_info =
build_timeline_info(&timeline, include_non_incremental_logical_size, &ctx)
.await
.context(
"Failed to convert tenant timeline {timeline_id} into the local one: {e:?}",
)
.map_err(ApiError::InternalServerError)?;
response_data.push(timeline_info);
}
@@ -269,11 +267,36 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, response_data)
}
/// Checks if a query param is present in the request's URL
fn query_param_present(request: &Request<Body>, param: &str) -> bool {
request
.uri()
.query()
.map(|v| url::form_urlencoded::parse(v.as_bytes()).any(|(p, _)| p == param))
.unwrap_or(false)
}
fn get_query_param(request: &Request<Body>, param_name: &str) -> Result<String, ApiError> {
request.uri().query().map_or(
Err(ApiError::BadRequest(anyhow!("empty query in request"))),
|v| {
url::form_urlencoded::parse(v.as_bytes())
.find(|(k, _)| k == param_name)
.map_or(
Err(ApiError::BadRequest(anyhow!(
"no {param_name} specified in query parameters"
))),
|(_, v)| Ok(v.into_owned()),
)
},
)
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
let include_non_incremental_logical_size =
query_param_present(&request, "include-non-incremental-logical-size");
check_permission(&request, Some(tenant_id))?;
// Logical size calculation needs downloading.
@@ -288,14 +311,11 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
.get_timeline(timeline_id, false)
.map_err(ApiError::NotFound)?;
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
let timeline_info =
build_timeline_info(&timeline, include_non_incremental_logical_size, &ctx)
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
Ok::<_, ApiError>(timeline_info)
}
@@ -310,8 +330,8 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let timestamp_raw = must_get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(&timestamp_raw)
let timestamp_raw = get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(timestamp_raw.as_str())
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
@@ -483,7 +503,13 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
let inputs_only = if query_param_present(&request, "inputs_only") {
get_query_param(&request, "inputs_only")?
.parse()
.map_err(|_| ApiError::BadRequest(anyhow!("failed to parse inputs_only")))?
} else {
false
};
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_id, true)
@@ -496,7 +522,7 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
.await
.map_err(ApiError::InternalServerError)?;
let size = if !inputs_only.unwrap_or(false) {
let size = if !inputs_only {
Some(inputs.calculate().map_err(ApiError::InternalServerError)?)
} else {
None

View File

@@ -255,8 +255,6 @@ pub enum TaskKind {
// A request that comes in via the pageserver HTTP API.
MgmtRequest,
DebugTool,
#[cfg(test)]
UnitTest,
}

View File

@@ -77,7 +77,7 @@ use utils::{
mod blob_io;
pub mod block_io;
pub mod disk_btree;
mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
@@ -2643,11 +2643,7 @@ impl Drop for Tenant {
}
}
/// Dump contents of a layer file to stdout.
pub fn dump_layerfile_from_path(
path: &Path,
verbose: bool,
ctx: &RequestContext,
) -> anyhow::Result<()> {
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
use std::os::unix::fs::FileExt;
// All layer files start with a two-byte "magic" value, to identify the kind of
@@ -2657,8 +2653,8 @@ pub fn dump_layerfile_from_path(
file.read_exact_at(&mut header_buf, 0)?;
match u16::from_be_bytes(header_buf) {
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose)?,
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose)?,
magic => bail!("unrecognized magic identifier: {:?}", magic),
}

View File

@@ -46,7 +46,6 @@
mod historic_layer_coverage;
mod layer_coverage;
use crate::context::RequestContext;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
@@ -655,22 +654,22 @@ where
/// debugging function to print out the contents of the layer map
#[allow(unused)]
pub fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
pub fn dump(&self, verbose: bool) -> Result<()> {
println!("Begin dump LayerMap");
println!("open_layer:");
if let Some(open_layer) = &self.open_layer {
open_layer.dump(verbose, ctx)?;
open_layer.dump(verbose)?;
}
println!("frozen_layers:");
for frozen_layer in self.frozen_layers.iter() {
frozen_layer.dump(verbose, ctx)?;
frozen_layer.dump(verbose)?;
}
println!("historic_layers:");
for layer in self.iter_historic_layers() {
layer.dump(verbose, ctx)?;
layer.dump(verbose)?;
}
println!("End dump LayerMap");
Ok(())

View File

@@ -1,12 +1,11 @@
//! Common traits and structs for layers
pub mod delta_layer;
mod delta_layer;
mod filename;
mod image_layer;
mod inmemory_layer;
mod remote_layer;
use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
@@ -118,14 +117,13 @@ pub trait Layer: Send + Sync {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
/// Dump summary of the contents of the layer to stdout
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
fn dump(&self, verbose: bool) -> Result<()>;
}
/// Returned by [`Layer::iter`]
@@ -163,11 +161,11 @@ pub trait PersistentLayer: Layer {
fn local_path(&self) -> Option<PathBuf>;
/// Iterate through all keys and values stored in the layer
fn iter(&self, ctx: &RequestContext) -> Result<LayerIter<'_>>;
fn iter(&self) -> Result<LayerIter<'_>>;
/// Iterate through all keys stored in the layer. Returns key, lsn and value size
/// It is used only for compaction and so is currently implemented only for DeltaLayer
fn key_iter(&self, _ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
panic!("Not implemented")
}
@@ -233,7 +231,6 @@ impl Layer for LayerDescriptor {
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
@@ -242,7 +239,7 @@ impl Layer for LayerDescriptor {
self.short_id.clone()
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
fn dump(&self, _verbose: bool) -> Result<()> {
todo!()
}
}

View File

@@ -24,7 +24,6 @@
//! "values" part.
//!
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{PageReadGuard, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
@@ -63,7 +62,7 @@ use super::{DeltaFileName, Layer, LayerFileName, LayerIter, LayerKeyIter, PathOr
/// the 'index' starts at the block indicated by 'index_start_blk'
///
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Summary {
struct Summary {
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
magic: u16,
format_version: u16,
@@ -74,9 +73,9 @@ pub struct Summary {
lsn_range: Range<Lsn>,
/// Block number where the 'index' part of the file begins.
pub index_start_blk: u32,
index_start_blk: u32,
/// Block within the 'index', where the B-tree root page is stored
pub index_root_blk: u32,
index_root_blk: u32,
}
impl From<&DeltaLayer> for Summary {
@@ -126,7 +125,7 @@ impl BlobRef {
}
}
pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
struct DeltaKey([u8; DELTA_KEY_SIZE]);
///
@@ -215,7 +214,7 @@ impl Layer for DeltaLayer {
self.filename().file_name()
}
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
fn dump(&self, verbose: bool) -> Result<()> {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
self.tenant_id,
@@ -230,7 +229,7 @@ impl Layer for DeltaLayer {
return Ok(());
}
let inner = self.load(ctx)?;
let inner = self.load()?;
println!(
"index_start_blk: {}, root {}",
@@ -294,7 +293,6 @@ impl Layer for DeltaLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
@@ -303,7 +301,7 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load(ctx)?;
let inner = self.load()?;
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
@@ -393,16 +391,16 @@ impl PersistentLayer for DeltaLayer {
Some(self.path())
}
fn iter(&self, ctx: &RequestContext) -> Result<LayerIter<'_>> {
let inner = self.load(ctx).context("load delta layer")?;
fn iter(&self) -> Result<LayerIter<'_>> {
let inner = self.load().context("load delta layer")?;
Ok(match DeltaValueIter::new(inner) {
Ok(iter) => Box::new(iter),
Err(err) => Box::new(std::iter::once(Err(err))),
})
}
fn key_iter(&self, ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
let inner = self.load(ctx)?;
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
let inner = self.load()?;
Ok(Box::new(
DeltaKeyIter::new(inner).context("Layer index is corrupted")?,
))
@@ -461,7 +459,7 @@ impl DeltaLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(&self, _ctx: &RequestContext) -> Result<RwLockReadGuard<DeltaLayerInner>> {
fn load(&self) -> Result<RwLockReadGuard<DeltaLayerInner>> {
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();

View File

@@ -20,7 +20,6 @@
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
@@ -144,7 +143,7 @@ impl Layer for ImageLayer {
}
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
fn dump(&self, verbose: bool) -> Result<()> {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} ----",
self.tenant_id, self.timeline_id, self.key_range.start, self.key_range.end, self.lsn
@@ -154,7 +153,7 @@ impl Layer for ImageLayer {
return Ok(());
}
let inner = self.load(ctx)?;
let inner = self.load()?;
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -175,13 +174,12 @@ impl Layer for ImageLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(ctx)?;
let inner = self.load()?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -222,7 +220,7 @@ impl PersistentLayer for ImageLayer {
fn get_timeline_id(&self) -> TimelineId {
self.timeline_id
}
fn iter(&self, _ctx: &RequestContext) -> Result<LayerIter<'_>> {
fn iter(&self) -> Result<LayerIter<'_>> {
unimplemented!();
}
@@ -272,7 +270,7 @@ impl ImageLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(&self, _ctx: &RequestContext) -> Result<RwLockReadGuard<ImageLayerInner>> {
fn load(&self) -> Result<RwLockReadGuard<ImageLayerInner>> {
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();

View File

@@ -5,7 +5,6 @@
//! its position in the file, is kept in memory, though.
//!
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
use crate::tenant::block_io::BlockReader;
@@ -111,7 +110,7 @@ impl Layer for InMemoryLayer {
}
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
fn dump(&self, verbose: bool) -> Result<()> {
let inner = self.inner.read().unwrap();
let end_str = inner
@@ -167,7 +166,6 @@ impl Layer for InMemoryLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;

View File

@@ -2,7 +2,6 @@
//! in remote storage.
//!
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
@@ -52,7 +51,6 @@ impl Layer for RemoteLayer {
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
@@ -65,7 +63,7 @@ impl Layer for RemoteLayer {
}
/// debugging function to print out the contents of the layer
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
fn dump(&self, _verbose: bool) -> Result<()> {
println!(
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
self.tenantid,
@@ -113,11 +111,11 @@ impl PersistentLayer for RemoteLayer {
None
}
fn iter(&self, _ctx: &RequestContext) -> Result<LayerIter<'_>> {
fn iter(&self) -> Result<LayerIter<'_>> {
bail!("cannot iterate a remote layer");
}
fn key_iter(&self, _ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
bail!("cannot iterate a remote layer");
}

View File

@@ -696,7 +696,7 @@ impl Timeline {
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size, ctx).await?;
self.compact_level0(target_file_size).await?;
timer.stop_and_record();
// If `create_image_layers' or `compact_level0` scheduled any
@@ -1767,7 +1767,6 @@ impl Timeline {
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
@@ -1793,7 +1792,6 @@ impl Timeline {
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
@@ -1827,7 +1825,6 @@ impl Timeline {
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
@@ -2466,7 +2463,6 @@ impl Timeline {
async fn compact_level0_phase1(
&self,
target_file_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<CompactLevel0Phase1Result> {
let layers = self.layers.read().unwrap();
let mut level0_deltas = layers.get_level0_deltas()?;
@@ -2526,9 +2522,8 @@ impl Timeline {
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| l.iter(ctx)),
|iter_iter| {
let all_values_iter =
itertools::process_results(deltas_to_compact.iter().map(|l| l.iter()), |iter_iter| {
iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
@@ -2544,12 +2539,11 @@ impl Timeline {
true
}
})
},
)?;
})?;
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
deltas_to_compact.iter().map(|l| l.key_iter()),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
@@ -2725,15 +2719,11 @@ impl Timeline {
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
///
async fn compact_level0(
&self,
target_file_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
async fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = self.compact_level0_phase1(target_file_size, ctx).await?;
} = self.compact_level0_phase1(target_file_size).await?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do

2694
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,59 +6,58 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
async-trait.workspace = true
atty.workspace = true
base64.workspace = true
bstr.workspace = true
bytes = { workspace = true, features = ["serde"] }
chrono.workspace = true
bytes = {workspace = true, features = ['serde'] }
clap.workspace = true
chrono.workspace = true
consumption_metrics.workspace = true
futures.workspace = true
git-version.workspace = true
hashbrown.workspace = true
hashlink.workspace = true
hex.workspace = true
hmac.workspace = true
hostname.workspace = true
humantime.workspace = true
hyper-tungstenite.workspace = true
hyper.workspace = true
hyper-tungstenite.workspace = true
itertools.workspace = true
md5.workspace = true
metrics.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
pin-project-lite.workspace = true
pq_proto.workspace = true
prometheus.workspace = true
rand.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
reqwest = { workspace = true, features = [ "json" ] }
routerify.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
rustls-pemfile.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2.workspace = true
socket2.workspace = true
thiserror.workspace = true
tls-listener.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tls-listener.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
utils.workspace = true
uuid.workspace = true
webpki-roots.workspace = true
x509-parser.workspace = true
metrics.workspace = true
pq_proto.workspace = true
utils.workspace = true
prometheus.workspace = true
humantime.workspace = true
hostname.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
async-trait.workspace = true
rcgen.workspace = true
rstest.workspace = true
tokio-postgres-rustls.workspace = true

View File

@@ -1,7 +1,7 @@
//! Client authentication mechanisms.
pub mod backend;
pub use backend::BackendType;
pub use backend::{BackendType, ConsoleReqExtra};
mod credentials;
pub use credentials::ClientCredentials;
@@ -12,7 +12,7 @@ use password_hack::PasswordHackPayload;
mod flow;
pub use flow::*;
use crate::{console, error::UserFacingError};
use crate::error::UserFacingError;
use std::io;
use thiserror::Error;
@@ -26,10 +26,10 @@ pub enum AuthErrorImpl {
Link(#[from] backend::LinkAuthError),
#[error(transparent)]
GetAuthInfo(#[from] console::errors::GetAuthInfoError),
GetAuthInfo(#[from] backend::GetAuthInfoError),
#[error(transparent)]
WakeCompute(#[from] console::errors::WakeComputeError),
WakeCompute(#[from] backend::WakeComputeError),
/// SASL protocol errors (includes [SCRAM](crate::scram)).
#[error(transparent)]

View File

@@ -1,40 +1,48 @@
mod classic;
mod postgres;
mod link;
use futures::TryFutureExt;
pub use link::LinkAuthError;
mod console;
pub use console::{GetAuthInfoError, WakeComputeError};
use crate::{
auth::{self, AuthFlow, ClientCredentials},
console::{
self,
provider::{CachedNodeInfo, ConsoleReqExtra},
Api,
},
stream, url,
compute,
console::messages::MetricsAuxInfo,
http, mgmt, stream, url,
waiters::{self, Waiter, Waiters},
};
use once_cell::sync::Lazy;
use std::borrow::Cow;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
/// A product of successful authentication.
pub struct AuthSuccess<T> {
/// Did we send [`pq_proto::BeMessage::AuthenticationOk`] to client?
pub reported_auth_ok: bool,
/// Something to be considered a positive result.
pub value: T,
static CPLANE_WAITERS: Lazy<Waiters<mgmt::ComputeReady>> = Lazy::new(Default::default);
/// Give caller an opportunity to wait for the cloud's reply.
pub async fn with_waiter<R, T, E>(
psql_session_id: impl Into<String>,
action: impl FnOnce(Waiter<'static, mgmt::ComputeReady>) -> R,
) -> Result<T, E>
where
R: std::future::Future<Output = Result<T, E>>,
E: From<waiters::RegisterError>,
{
let waiter = CPLANE_WAITERS.register(psql_session_id.into())?;
action(waiter).await
}
impl<T> AuthSuccess<T> {
/// Very similar to [`std::option::Option::map`].
/// Maps [`AuthSuccess<T>`] to [`AuthSuccess<R>`] by applying
/// a function to a contained value.
pub fn map<R>(self, f: impl FnOnce(T) -> R) -> AuthSuccess<R> {
AuthSuccess {
reported_auth_ok: self.reported_auth_ok,
value: f(self.value),
}
}
pub fn notify(psql_session_id: &str, msg: mgmt::ComputeReady) -> Result<(), waiters::NotifyError> {
CPLANE_WAITERS.notify(psql_session_id, msg)
}
/// Extra query params we'd like to pass to the console.
pub struct ConsoleReqExtra<'a> {
/// A unique identifier for a connection.
pub session_id: uuid::Uuid,
/// Name of client application, if set.
pub application_name: Option<&'a str>,
}
/// This type serves two purposes:
@@ -45,11 +53,12 @@ impl<T> AuthSuccess<T> {
/// * However, when we substitute `T` with [`ClientCredentials`],
/// this helps us provide the credentials only to those auth
/// backends which require them for the authentication process.
#[derive(Debug)]
pub enum BackendType<'a, T> {
/// Current Cloud API (V2).
Console(Cow<'a, console::provider::neon::Api>, T),
Console(Cow<'a, http::Endpoint>, T),
/// Local mock of Cloud API (V2).
Postgres(Cow<'a, console::provider::mock::Api>, T),
Postgres(Cow<'a, url::ApiUrl>, T),
/// Authentication via a web browser.
Link(Cow<'a, url::ApiUrl>),
}
@@ -58,8 +67,14 @@ impl std::fmt::Display for BackendType<'_, ()> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use BackendType::*;
match self {
Console(endpoint, _) => fmt.debug_tuple("Console").field(&endpoint.url()).finish(),
Postgres(endpoint, _) => fmt.debug_tuple("Postgres").field(&endpoint.url()).finish(),
Console(endpoint, _) => fmt
.debug_tuple("Console")
.field(&endpoint.url().as_str())
.finish(),
Postgres(endpoint, _) => fmt
.debug_tuple("Postgres")
.field(&endpoint.as_str())
.finish(),
Link(url) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
}
}
@@ -105,16 +120,30 @@ impl<'a, T, E> BackendType<'a, Result<T, E>> {
}
}
// TODO: get rid of explicit lifetimes in this block (there's a bug in rustc).
// Read more: https://github.com/rust-lang/rust/issues/99190
// Alleged fix: https://github.com/rust-lang/rust/pull/89056
impl<'l> BackendType<'l, ClientCredentials<'_>> {
/// A product of successful authentication.
pub struct AuthSuccess<T> {
/// Did we send [`pq_proto::BeMessage::AuthenticationOk`] to client?
pub reported_auth_ok: bool,
/// Something to be considered a positive result.
pub value: T,
}
/// Info for establishing a connection to a compute node.
/// This is what we get after auth succeeded, but not before!
pub struct NodeInfo {
/// Compute node connection params.
pub config: compute::ConnCfg,
/// Labels for proxy's metrics.
pub aux: MetricsAuxInfo,
}
impl BackendType<'_, ClientCredentials<'_>> {
/// Do something special if user didn't provide the `project` parameter.
async fn try_password_hack<'a>(
&'a mut self,
extra: &'a ConsoleReqExtra<'a>,
client: &'a mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<Option<AuthSuccess<CachedNodeInfo>>> {
async fn try_password_hack(
&mut self,
extra: &ConsoleReqExtra<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
) -> auth::Result<Option<AuthSuccess<NodeInfo>>> {
use BackendType::*;
// If there's no project so far, that entails that client doesn't
@@ -150,28 +179,33 @@ impl<'l> BackendType<'l, ClientCredentials<'_>> {
// TODO: find a proper way to merge those very similar blocks.
let (mut node, payload) = match self {
Console(api, creds) if creds.project.is_none() => {
Console(endpoint, creds) if creds.project.is_none() => {
let payload = fetch_magic_payload(client).await?;
let mut creds = creds.as_ref();
creds.project = Some(payload.project.as_str().into());
let node = api.wake_compute(extra, &creds).await?;
let node = console::Api::new(endpoint, extra, &creds)
.wake_compute()
.await?;
(node, payload)
}
// This is a hack to allow cleartext password in secure connections (wss).
Console(api, creds) if creds.use_cleartext_password_flow => {
Console(endpoint, creds) if creds.use_cleartext_password_flow => {
// This is a hack to allow cleartext password in secure connections (wss).
let payload = fetch_plaintext_password(client).await?;
let node = api.wake_compute(extra, creds).await?;
let creds = creds.as_ref();
let node = console::Api::new(endpoint, extra, &creds)
.wake_compute()
.await?;
(node, payload)
}
Postgres(api, creds) if creds.project.is_none() => {
Postgres(endpoint, creds) if creds.project.is_none() => {
let payload = fetch_magic_payload(client).await?;
let mut creds = creds.as_ref();
creds.project = Some(payload.project.as_str().into());
let node = api.wake_compute(extra, &creds).await?;
let node = postgres::Api::new(endpoint, &creds).wake_compute().await?;
(node, payload)
}
@@ -186,11 +220,11 @@ impl<'l> BackendType<'l, ClientCredentials<'_>> {
}
/// Authenticate the client via the requested backend, possibly using credentials.
pub async fn authenticate<'a>(
&mut self,
extra: &'a ConsoleReqExtra<'a>,
client: &'a mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
pub async fn authenticate(
mut self,
extra: &ConsoleReqExtra<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
) -> auth::Result<AuthSuccess<NodeInfo>> {
use BackendType::*;
// Handle cases when `project` is missing in `creds`.
@@ -201,7 +235,7 @@ impl<'l> BackendType<'l, ClientCredentials<'_>> {
}
let res = match self {
Console(api, creds) => {
Console(endpoint, creds) => {
info!(
user = creds.user,
project = creds.project(),
@@ -209,40 +243,26 @@ impl<'l> BackendType<'l, ClientCredentials<'_>> {
);
assert!(creds.project.is_some());
classic::handle_user(api.as_ref(), extra, creds, client).await?
console::Api::new(&endpoint, extra, &creds)
.handle_user(client)
.await?
}
Postgres(api, creds) => {
Postgres(endpoint, creds) => {
info!("performing mock authentication using a local postgres instance");
assert!(creds.project.is_some());
classic::handle_user(api.as_ref(), extra, creds, client).await?
postgres::Api::new(&endpoint, &creds)
.handle_user(client)
.await?
}
// NOTE: this auth backend doesn't use client credentials.
Link(url) => {
info!("performing link authentication");
link::handle_user(url, client)
.await?
.map(CachedNodeInfo::new_uncached)
link::handle_user(&url, client).await?
}
};
info!("user successfully authenticated");
Ok(res)
}
/// When applicable, wake the compute node, gaining its connection info in the process.
/// The link auth flow doesn't support this, so we return [`None`] in that case.
pub async fn wake_compute<'a>(
&self,
extra: &'a ConsoleReqExtra<'a>,
) -> Result<Option<CachedNodeInfo>, console::errors::WakeComputeError> {
use BackendType::*;
match self {
Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Link(_) => Ok(None),
}
}
}

View File

@@ -1,61 +0,0 @@
use super::AuthSuccess;
use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute,
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
sasl, scram,
stream::PqStream,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
pub(super) async fn handle_user(
api: &impl console::Api,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
info!("fetching user's authentication info");
let info = api.get_auth_info(extra, creds).await?.unwrap_or_else(|| {
// If we don't have an authentication secret, we mock one to
// prevent malicious probing (possible due to missing protocol steps).
// This mocked secret will never lead to successful authentication.
info!("authentication info not found, mocking it");
AuthInfo::Scram(scram::ServerSecret::mock(creds.user, rand::random()))
});
let flow = AuthFlow::new(client);
let scram_keys = match info {
AuthInfo::Md5(_) => {
info!("auth endpoint chooses MD5");
return Err(auth::AuthError::bad_auth_method("MD5"));
}
AuthInfo::Scram(secret) => {
info!("auth endpoint chooses SCRAM");
let scram = auth::Scram(&secret);
let client_key = match flow.begin(scram).await?.authenticate().await? {
sasl::Outcome::Success(key) => key,
sasl::Outcome::Failure(reason) => {
info!("auth backend failed with an error: {reason}");
return Err(auth::AuthError::auth_failed(creds.user));
}
};
Some(compute::ScramKeys {
client_key: client_key.as_bytes(),
server_key: secret.server_key.as_bytes(),
})
}
};
let mut node = api.wake_compute(extra, creds).await?;
if let Some(keys) = scram_keys {
use tokio_postgres::config::AuthKeys;
node.config.auth_keys(AuthKeys::ScramSha256(keys));
}
Ok(AuthSuccess {
reported_auth_ok: false,
value: node,
})
}

View File

@@ -0,0 +1,365 @@
//! Cloud API V2.
use super::{AuthSuccess, ConsoleReqExtra, NodeInfo};
use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute,
console::messages::{ConsoleError, GetRoleSecret, WakeCompute},
error::{io_error, UserFacingError},
http, sasl, scram,
stream::PqStream,
};
use futures::TryFutureExt;
use reqwest::StatusCode as HttpStatusCode;
use std::future::Future;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, info_span, warn, Instrument};
/// A go-to error message which doesn't leak any detail.
const REQUEST_FAILED: &str = "Console request failed";
/// Common console API error.
#[derive(Debug, Error)]
pub enum ApiError {
/// Error returned by the console itself.
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
Console {
status: HttpStatusCode,
text: Box<str>,
},
/// Various IO errors like broken pipe or malformed payload.
#[error("{REQUEST_FAILED}: {0}")]
Transport(#[from] std::io::Error),
}
impl ApiError {
/// Returns HTTP status code if it's the reason for failure.
fn http_status_code(&self) -> Option<HttpStatusCode> {
use ApiError::*;
match self {
Console { status, .. } => Some(*status),
_ => None,
}
}
}
impl UserFacingError for ApiError {
fn to_string_client(&self) -> String {
use ApiError::*;
match self {
// To minimize risks, only select errors are forwarded to users.
// Ask @neondatabase/control-plane for review before adding more.
Console { status, .. } => match *status {
HttpStatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
HttpStatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
HttpStatusCode::LOCKED => {
// Status 423: project might be in maintenance mode (or bad state).
format!("{REQUEST_FAILED}: endpoint is temporary unavailable")
}
_ => REQUEST_FAILED.to_owned(),
},
_ => REQUEST_FAILED.to_owned(),
}
}
}
// Helps eliminate graceless `.map_err` calls without introducing another ctor.
impl From<reqwest::Error> for ApiError {
fn from(e: reqwest::Error) -> Self {
io_error(e).into()
}
}
#[derive(Debug, Error)]
pub enum GetAuthInfoError {
// We shouldn't include the actual secret here.
#[error("Console responded with a malformed auth secret")]
BadSecret,
#[error(transparent)]
ApiError(ApiError),
}
// This allows more useful interactions than `#[from]`.
impl<E: Into<ApiError>> From<E> for GetAuthInfoError {
fn from(e: E) -> Self {
Self::ApiError(e.into())
}
}
impl UserFacingError for GetAuthInfoError {
fn to_string_client(&self) -> String {
use GetAuthInfoError::*;
match self {
// We absolutely should not leak any secrets!
BadSecret => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
}
}
}
#[derive(Debug, Error)]
pub enum WakeComputeError {
#[error("Console responded with a malformed compute address: {0}")]
BadComputeAddress(Box<str>),
#[error(transparent)]
ApiError(ApiError),
}
// This allows more useful interactions than `#[from]`.
impl<E: Into<ApiError>> From<E> for WakeComputeError {
fn from(e: E) -> Self {
Self::ApiError(e.into())
}
}
impl UserFacingError for WakeComputeError {
fn to_string_client(&self) -> String {
use WakeComputeError::*;
match self {
// We shouldn't show user the address even if it's broken.
// Besides, user is unlikely to care about this detail.
BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
}
}
}
/// Auth secret which is managed by the cloud.
pub enum AuthInfo {
/// Md5 hash of user's password.
Md5([u8; 16]),
/// [SCRAM](crate::scram) authentication info.
Scram(scram::ServerSecret),
}
#[must_use]
pub(super) struct Api<'a> {
endpoint: &'a http::Endpoint,
extra: &'a ConsoleReqExtra<'a>,
creds: &'a ClientCredentials<'a>,
}
impl<'a> AsRef<ClientCredentials<'a>> for Api<'a> {
fn as_ref(&self) -> &ClientCredentials<'a> {
self.creds
}
}
impl<'a> Api<'a> {
/// Construct an API object containing the auth parameters.
pub(super) fn new(
endpoint: &'a http::Endpoint,
extra: &'a ConsoleReqExtra<'a>,
creds: &'a ClientCredentials,
) -> Self {
Self {
endpoint,
extra,
creds,
}
}
/// Authenticate the existing user or throw an error.
pub(super) async fn handle_user(
&'a self,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
) -> auth::Result<AuthSuccess<NodeInfo>> {
handle_user(client, self, Self::get_auth_info, Self::wake_compute).await
}
}
impl Api<'_> {
async fn get_auth_info(&self) -> Result<Option<AuthInfo>, GetAuthInfoError> {
let request_id = uuid::Uuid::new_v4().to_string();
async {
let request = self
.endpoint
.get("proxy_get_role_secret")
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
("project", Some(self.creds.project().expect("impossible"))),
("role", Some(self.creds.user)),
])
.build()?;
info!(url = request.url().as_str(), "sending http request");
let response = self.endpoint.execute(request).await?;
let body = match parse_body::<GetRoleSecret>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
Err(e) => match e.http_status_code() {
Some(HttpStatusCode::NOT_FOUND) => return Ok(None),
_otherwise => return Err(e.into()),
},
};
let secret = scram::ServerSecret::parse(&body.role_secret)
.map(AuthInfo::Scram)
.ok_or(GetAuthInfoError::BadSecret)?;
Ok(Some(secret))
}
.map_err(crate::error::log_error)
.instrument(info_span!("get_auth_info", id = request_id))
.await
}
/// Wake up the compute node and return the corresponding connection info.
pub async fn wake_compute(&self) -> Result<NodeInfo, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
async {
let request = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
("project", Some(self.creds.project().expect("impossible"))),
])
.build()?;
info!(url = request.url().as_str(), "sending http request");
let response = self.endpoint.execute(request).await?;
let body = parse_body::<WakeCompute>(response).await?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&body.address) {
None => return Err(WakeComputeError::BadComputeAddress(body.address)),
Some(x) => x,
};
let mut config = compute::ConnCfg::new();
config
.host(host)
.port(port)
.dbname(self.creds.dbname)
.user(self.creds.user);
Ok(NodeInfo {
config,
aux: body.aux,
})
}
.map_err(crate::error::log_error)
.instrument(info_span!("wake_compute", id = request_id))
.await
}
}
/// Common logic for user handling in API V2.
/// We reuse this for a mock API implementation in [`super::postgres`].
pub(super) async fn handle_user<'a, Endpoint, GetAuthInfo, WakeCompute>(
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
endpoint: &'a Endpoint,
get_auth_info: impl FnOnce(&'a Endpoint) -> GetAuthInfo,
wake_compute: impl FnOnce(&'a Endpoint) -> WakeCompute,
) -> auth::Result<AuthSuccess<NodeInfo>>
where
Endpoint: AsRef<ClientCredentials<'a>>,
GetAuthInfo: Future<Output = Result<Option<AuthInfo>, GetAuthInfoError>>,
WakeCompute: Future<Output = Result<NodeInfo, WakeComputeError>>,
{
let creds = endpoint.as_ref();
info!("fetching user's authentication info");
let info = get_auth_info(endpoint).await?.unwrap_or_else(|| {
// If we don't have an authentication secret, we mock one to
// prevent malicious probing (possible due to missing protocol steps).
// This mocked secret will never lead to successful authentication.
info!("authentication info not found, mocking it");
AuthInfo::Scram(scram::ServerSecret::mock(creds.user, rand::random()))
});
let flow = AuthFlow::new(client);
let scram_keys = match info {
AuthInfo::Md5(_) => {
info!("auth endpoint chooses MD5");
return Err(auth::AuthError::bad_auth_method("MD5"));
}
AuthInfo::Scram(secret) => {
info!("auth endpoint chooses SCRAM");
let scram = auth::Scram(&secret);
let client_key = match flow.begin(scram).await?.authenticate().await? {
sasl::Outcome::Success(key) => key,
sasl::Outcome::Failure(reason) => {
info!("auth backend failed with an error: {reason}");
return Err(auth::AuthError::auth_failed(creds.user));
}
};
Some(compute::ScramKeys {
client_key: client_key.as_bytes(),
server_key: secret.server_key.as_bytes(),
})
}
};
let mut node = wake_compute(endpoint).await?;
if let Some(keys) = scram_keys {
use tokio_postgres::config::AuthKeys;
node.config.auth_keys(AuthKeys::ScramSha256(keys));
}
Ok(AuthSuccess {
reported_auth_ok: false,
value: node,
})
}
/// Parse http response body, taking status code into account.
async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
response: reqwest::Response,
) -> Result<T, ApiError> {
let status = response.status();
if status.is_success() {
// We shouldn't log raw body because it may contain secrets.
info!("request succeeded, processing the body");
return Ok(response.json().await?);
}
// Don't throw an error here because it's not as important
// as the fact that the request itself has failed.
let body = response.json().await.unwrap_or_else(|e| {
warn!("failed to parse error body: {e}");
ConsoleError {
error: "reason unclear (malformed error message)".into(),
}
});
let text = body.error;
error!("console responded with an error ({status}): {text}");
Err(ApiError::Console { status, text })
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.split_once(':')?;
Some((host, port.parse().ok()?))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_host_port() {
let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 5432);
}
}

View File

@@ -1,11 +1,5 @@
use super::AuthSuccess;
use crate::{
auth, compute,
console::{self, provider::NodeInfo},
error::UserFacingError,
stream::PqStream,
waiters,
};
use super::{AuthSuccess, NodeInfo};
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
use pq_proto::BeMessage as Be;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -53,7 +47,7 @@ pub fn new_psql_session_id() -> String {
hex::encode(rand::random::<[u8; 8]>())
}
pub(super) async fn handle_user(
pub async fn handle_user(
link_uri: &reqwest::Url,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<NodeInfo>> {
@@ -61,7 +55,7 @@ pub(super) async fn handle_user(
let span = info_span!("link", psql_session_id = &psql_session_id);
let greeting = hello_message(link_uri, &psql_session_id);
let db_info = console::mgmt::with_waiter(psql_session_id, |waiter| async {
let db_info = super::with_waiter(psql_session_id, |waiter| async {
// Give user a URL to spawn a new database.
info!(parent: &span, "sending the auth URL to the user");
client
@@ -86,14 +80,14 @@ pub(super) async fn handle_user(
.user(&db_info.user);
if let Some(password) = db_info.password {
config.password(password.as_ref());
config.password(password);
}
Ok(AuthSuccess {
reported_auth_ok: true,
value: NodeInfo {
config,
aux: db_info.aux.into(),
aux: db_info.aux,
},
})
}

View File

@@ -1,14 +1,21 @@
//! Mock console backend which relies on a user-provided postgres instance.
//! Local mock of Cloud API V2.
use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
console::{self, AuthInfo, GetAuthInfoError, WakeComputeError},
AuthSuccess, NodeInfo,
};
use crate::{
auth::{self, ClientCredentials},
compute,
error::io_error,
scram,
stream::PqStream,
url::ApiUrl,
};
use crate::{auth::ClientCredentials, compute, error::io_error, scram, url::ApiUrl};
use async_trait::async_trait;
use futures::TryFutureExt;
use thiserror::Error;
use tracing::{error, info, info_span, warn, Instrument};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span, warn, Instrument};
#[derive(Debug, Error)]
enum MockApiError {
@@ -16,36 +23,49 @@ enum MockApiError {
PasswordNotSet(tokio_postgres::Error),
}
impl From<MockApiError> for ApiError {
impl From<MockApiError> for console::ApiError {
fn from(e: MockApiError) -> Self {
io_error(e).into()
}
}
impl From<tokio_postgres::Error> for ApiError {
impl From<tokio_postgres::Error> for console::ApiError {
fn from(e: tokio_postgres::Error) -> Self {
io_error(e).into()
}
}
#[derive(Clone)]
pub struct Api {
endpoint: ApiUrl,
#[must_use]
pub(super) struct Api<'a> {
endpoint: &'a ApiUrl,
creds: &'a ClientCredentials<'a>,
}
impl Api {
pub fn new(endpoint: ApiUrl) -> Self {
Self { endpoint }
impl<'a> AsRef<ClientCredentials<'a>> for Api<'a> {
fn as_ref(&self) -> &ClientCredentials<'a> {
self.creds
}
}
impl<'a> Api<'a> {
/// Construct an API object containing the auth parameters.
pub(super) fn new(endpoint: &'a ApiUrl, creds: &'a ClientCredentials) -> Self {
Self { endpoint, creds }
}
pub fn url(&self) -> &str {
self.endpoint.as_str()
/// Authenticate the existing user or throw an error.
pub(super) async fn handle_user(
&'a self,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin + Send>,
) -> auth::Result<AuthSuccess<NodeInfo>> {
// We reuse user handling logic from a production module.
console::handle_user(client, self, Self::get_auth_info, Self::wake_compute).await
}
}
async fn do_get_auth_info(
&self,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, GetAuthInfoError> {
impl Api<'_> {
/// This implementation fetches the auth info from a local postgres instance.
async fn get_auth_info(&self) -> Result<Option<AuthInfo>, GetAuthInfoError> {
async {
// Perhaps we could persist this connection, but then we'd have to
// write more code for reopening it if it got closed, which doesn't
@@ -55,7 +75,7 @@ impl Api {
tokio::spawn(connection);
let query = "select rolpassword from pg_catalog.pg_authid where rolname = $1";
let rows = client.query(query, &[&creds.user]).await?;
let rows = client.query(query, &[&self.creds.user]).await?;
// We can get at most one row, because `rolname` is unique.
let row = match rows.get(0) {
@@ -64,7 +84,7 @@ impl Api {
// However, this is still a *valid* outcome which is very similar
// to getting `404 Not found` from the Neon console.
None => {
warn!("user '{}' does not exist", creds.user);
warn!("user '{}' does not exist", self.creds.user);
return Ok(None);
}
};
@@ -78,50 +98,23 @@ impl Api {
Ok(secret.or_else(|| parse_md5(entry).map(AuthInfo::Md5)))
}
.map_err(crate::error::log_error)
.instrument(info_span!("postgres", url = self.endpoint.as_str()))
.instrument(info_span!("get_auth_info", mock = self.endpoint.as_str()))
.await
}
async fn do_wake_compute(
&self,
creds: &ClientCredentials<'_>,
) -> Result<NodeInfo, WakeComputeError> {
/// We don't need to wake anything locally, so we just return the connection info.
pub async fn wake_compute(&self) -> Result<NodeInfo, WakeComputeError> {
let mut config = compute::ConnCfg::new();
config
.host(self.endpoint.host_str().unwrap_or("localhost"))
.port(self.endpoint.port().unwrap_or(5432))
.dbname(creds.dbname)
.user(creds.user);
.dbname(self.creds.dbname)
.user(self.creds.user);
let node = NodeInfo {
Ok(NodeInfo {
config,
aux: Default::default(),
};
Ok(node)
}
}
#[async_trait]
impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_auth_info(
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, GetAuthInfoError> {
self.do_get_auth_info(creds).await
}
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute(creds)
.map_ok(CachedNodeInfo::new_uncached)
.await
})
}
}

View File

@@ -33,7 +33,6 @@ impl UserFacingError for ClientCredsParseError {}
pub struct ClientCredentials<'a> {
pub user: &'a str,
pub dbname: &'a str,
// TODO: this is a severe misnomer! We should think of a new name ASAP.
pub project: Option<Cow<'a, str>>,
/// If `True`, we'll use the old cleartext password flow. This is used for
/// websocket connections, which want to minimize the number of round trips.

View File

@@ -1,304 +0,0 @@
use std::{
borrow::Borrow,
hash::Hash,
ops::{Deref, DerefMut},
time::{Duration, Instant},
};
use tracing::debug;
// This seems to make more sense than `lru` or `cached`:
//
// * `near/nearcore` ditched `cached` in favor of `lru`
// (https://github.com/near/nearcore/issues?q=is%3Aissue+lru+is%3Aclosed).
//
// * `lru` methods use an obscure `KeyRef` type in their contraints (which is deliberately excluded from docs).
// This severely hinders its usage both in terms of creating wrappers and supported key types.
//
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
/// A generic trait which exposes types of cache's key and value,
/// as well as the notion of cache entry invalidation.
/// This is useful for [`timed_lru::Cached`].
pub trait Cache {
/// Entry's key.
type Key;
/// Entry's value.
type Value;
/// Used for entry invalidation.
type LookupInfo<Key>;
/// Invalidate an entry using a lookup info.
/// We don't have an empty default impl because it's error-prone.
fn invalidate(&self, _: &Self::LookupInfo<Self::Key>);
}
impl<C: Cache> Cache for &C {
type Key = C::Key;
type Value = C::Value;
type LookupInfo<Key> = C::LookupInfo<Key>;
fn invalidate(&self, info: &Self::LookupInfo<Self::Key>) {
C::invalidate(self, info)
}
}
pub use timed_lru::TimedLru;
pub mod timed_lru {
use super::*;
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:
///
/// * Whenever a new entry is inserted, the least recently accessed one is evicted.
/// The cache also keeps track of entry's insertion time (`created_at`) and TTL (`expires_at`).
///
/// * When the entry is about to be retrieved, we check its expiration timestamp.
/// If the entry has expired, we remove it from the cache; Otherwise we bump the
/// expiration timestamp (e.g. +5mins) and change its place in LRU list to prolong
/// its existence.
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
/// There is no background job to reap the expired records.
///
/// * It's possible for an entry that has not yet expired entry to be evicted
/// before expired items. That's a bit wasteful, but probably fine in practice.
pub struct TimedLru<K, V> {
/// Cache's name for tracing.
name: &'static str,
/// The underlying cache implementation.
cache: parking_lot::Mutex<LruCache<K, Entry<V>>>,
/// Default time-to-live of a single entry.
ttl: Duration,
}
impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
type Key = K;
type Value = V;
type LookupInfo<Key> = LookupInfo<Key>;
fn invalidate(&self, info: &Self::LookupInfo<K>) {
self.invalidate_raw(info)
}
}
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
value: T,
}
impl<K: Hash + Eq, V> TimedLru<K, V> {
/// Construct a new LRU cache with timed entries.
pub fn new(name: &'static str, capacity: usize, ttl: Duration) -> Self {
Self {
name,
cache: LruCache::new(capacity).into(),
ttl,
}
}
/// Drop an entry from the cache if it's outdated.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn invalidate_raw(&self, info: &LookupInfo<K>) {
let now = Instant::now();
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let raw_entry = match cache.raw_entry_mut().from_key(&info.key) {
RawEntryMut::Vacant(_) => return,
RawEntryMut::Occupied(x) => x,
};
// Remove the entry if it was created prior to lookup timestamp.
let entry = raw_entry.get();
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
let should_remove = created_at <= info.created_at || expires_at <= now;
if should_remove {
raw_entry.remove();
}
drop(cache); // drop lock before logging
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
entry_removed = should_remove,
"processed a cache entry invalidation event"
);
}
/// Try retrieving an entry by its key, then execute `extract` if it exists.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn get_raw<Q, R>(&self, key: &Q, extract: impl FnOnce(&K, &Entry<V>) -> R) -> Option<R>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let now = Instant::now();
let deadline = now.checked_add(self.ttl).expect("time overflow");
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let mut raw_entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return None,
RawEntryMut::Occupied(x) => x,
};
// Immeditely drop the entry if it has expired.
let entry = raw_entry.get();
if entry.expires_at <= now {
raw_entry.remove();
return None;
}
let value = extract(raw_entry.key(), entry);
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
// Update the deadline and the entry's position in the LRU list.
raw_entry.get_mut().expires_at = deadline;
raw_entry.to_back();
drop(cache); // drop lock before logging
debug!(
created_at = format_args!("{created_at:?}"),
old_expires_at = format_args!("{expires_at:?}"),
new_expires_at = format_args!("{deadline:?}"),
"accessed a cache entry"
);
Some(value)
}
/// Insert an entry to the cache. If an entry with the same key already
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw(&self, key: K, value: V) -> (Instant, Option<V>) {
let created_at = Instant::now();
let expires_at = created_at.checked_add(self.ttl).expect("time overflow");
let entry = Entry {
created_at,
expires_at,
value,
};
// Do costly things before taking the lock.
let old = self
.cache
.lock()
.insert(key, entry)
.map(|entry| entry.value);
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
replaced = old.is_some(),
"created a cache entry"
);
(created_at, old)
}
}
impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
pub fn insert(&self, key: K, value: V) -> (Option<V>, Cached<&Self>) {
let (created_at, old) = self.insert_raw(key.clone(), value.clone());
let cached = Cached {
token: Some((self, LookupInfo { created_at, key })),
value,
};
(old, cached)
}
}
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
/// Retrieve a cached entry in convenient wrapper.
pub fn get<Q>(&self, key: &Q) -> Option<timed_lru::Cached<&Self>>
where
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| {
let info = LookupInfo {
created_at: entry.created_at,
key: key.clone(),
};
Cached {
token: Some((self, info)),
value: entry.value.clone(),
}
})
}
}
/// Lookup information for key invalidation.
pub struct LookupInfo<K> {
/// Time of creation of a cache [`Entry`].
/// We use this during invalidation lookups to prevent eviction of a newer
/// entry sharing the same key (it might've been inserted by a different
/// task after we got the entry we're trying to invalidate now).
created_at: Instant,
/// Search by this key.
key: K,
}
/// Wrapper for convenient entry invalidation.
pub struct Cached<C: Cache> {
/// Cache + lookup info.
token: Option<(C, C::LookupInfo<C::Key>)>,
/// The value itself.
pub value: C::Value,
}
impl<C: Cache> Cached<C> {
/// Place any entry into this wrapper; invalidation will be a no-op.
/// Unfortunately, rust doesn't let us implement [`From`] or [`Into`].
pub fn new_uncached(value: impl Into<C::Value>) -> Self {
Self {
token: None,
value: value.into(),
}
}
/// Drop this entry from a cache if it's still there.
pub fn invalidate(&self) {
if let Some((cache, info)) = &self.token {
cache.invalidate(info);
}
}
/// Tell if this entry is actually cached.
pub fn cached(&self) -> bool {
self.token.is_some()
}
}
impl<C: Cache> Deref for Cached<C> {
type Target = C::Value;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<C: Cache> DerefMut for Cached<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}
}

View File

@@ -1,5 +1,6 @@
use anyhow::{anyhow, Context};
use hashbrown::HashMap;
use parking_lot::Mutex;
use pq_proto::CancelKeyData;
use std::net::SocketAddr;
use tokio::net::TcpStream;
@@ -8,15 +9,14 @@ use tracing::info;
/// Enables serving `CancelRequest`s.
#[derive(Default)]
pub struct CancelMap(parking_lot::RwLock<HashMap<CancelKeyData, Option<CancelClosure>>>);
pub struct CancelMap(Mutex<HashMap<CancelKeyData, Option<CancelClosure>>>);
impl CancelMap {
/// Cancel a running query for the corresponding connection.
pub async fn cancel_session(&self, key: CancelKeyData) -> anyhow::Result<()> {
// NB: we should immediately release the lock after cloning the token.
let cancel_closure = self
.0
.read()
.lock()
.get(&key)
.and_then(|x| x.clone())
.with_context(|| format!("query cancellation key not found: {key}"))?;
@@ -41,14 +41,14 @@ impl CancelMap {
// Random key collisions are unlikely to happen here, but they're still possible,
// which is why we have to take care not to rewrite an existing key.
self.0
.write()
.lock()
.try_insert(key, None)
.map_err(|_| anyhow!("query cancellation key already exists: {key}"))?;
// This will guarantee that the session gets dropped
// as soon as the future is finished.
scopeguard::defer! {
self.0.write().remove(&key);
self.0.lock().remove(&key);
info!("dropped query cancellation key {key}");
}
@@ -59,12 +59,12 @@ impl CancelMap {
#[cfg(test)]
fn contains(&self, session: &Session) -> bool {
self.0.read().contains_key(&session.key)
self.0.lock().contains_key(&session.key)
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.0.read().is_empty()
self.0.lock().is_empty()
}
}
@@ -115,7 +115,7 @@ impl Session<'_> {
info!("enabling query cancellation for this session");
self.cancel_map
.0
.write()
.lock()
.insert(self.key, Some(cancel_closure));
self.key

View File

@@ -42,65 +42,14 @@ pub type ScramKeys = tokio_postgres::config::ScramKeys<32>;
/// A config for establishing a connection to compute node.
/// Eventually, `tokio_postgres` will be replaced with something better.
/// Newtype allows us to implement methods on top of it.
#[derive(Clone)]
#[repr(transparent)]
pub struct ConnCfg(Box<tokio_postgres::Config>);
/// Creation and initialization routines.
impl ConnCfg {
/// Construct a new connection config.
pub fn new() -> Self {
Self(Default::default())
}
/// Reuse password or auth keys from the other config.
pub fn reuse_password(&mut self, other: &Self) {
if let Some(password) = other.get_password() {
self.password(password);
}
if let Some(keys) = other.get_auth_keys() {
self.auth_keys(keys);
}
}
/// Apply startup message params to the connection config.
pub fn set_startup_params(&mut self, params: &StartupMessageParams) {
if let Some(options) = params.options_raw() {
// We must drop all proxy-specific parameters.
#[allow(unstable_name_collisions)]
let options: String = options
.filter(|opt| !opt.starts_with("project="))
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();
self.options(&options);
}
if let Some(app_name) = params.get("application_name") {
self.application_name(app_name);
}
// TODO: This is especially ugly...
if let Some(replication) = params.get("replication") {
use tokio_postgres::config::ReplicationMode;
match replication {
"true" | "on" | "yes" | "1" => {
self.replication_mode(ReplicationMode::Physical);
}
"database" => {
self.replication_mode(ReplicationMode::Logical);
}
_other => {}
}
}
// TODO: extend the list of the forwarded startup parameters.
// Currently, tokio-postgres doesn't allow us to pass
// arbitrary parameters, but the ones above are a good start.
//
// This and the reverse params problem can be better addressed
// in a bespoke connection machinery (a new library for that sake).
}
}
impl std::ops::Deref for ConnCfg {
@@ -183,13 +132,50 @@ pub struct PostgresConnection {
pub stream: TcpStream,
/// PostgreSQL connection parameters.
pub params: std::collections::HashMap<String, String>,
/// Query cancellation token.
pub cancel_closure: CancelClosure,
}
impl ConnCfg {
/// Connect to a corresponding compute node.
pub async fn connect(&self) -> Result<PostgresConnection, ConnectionError> {
pub async fn connect(
mut self,
params: &StartupMessageParams,
) -> Result<(PostgresConnection, CancelClosure), ConnectionError> {
if let Some(options) = params.options_raw() {
// We must drop all proxy-specific parameters.
#[allow(unstable_name_collisions)]
let options: String = options
.filter(|opt| !opt.starts_with("project="))
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();
self.0.options(&options);
}
if let Some(app_name) = params.get("application_name") {
self.0.application_name(app_name);
}
// TODO: This is especially ugly...
if let Some(replication) = params.get("replication") {
use tokio_postgres::config::ReplicationMode;
match replication {
"true" | "on" | "yes" | "1" => {
self.0.replication_mode(ReplicationMode::Physical);
}
"database" => {
self.0.replication_mode(ReplicationMode::Logical);
}
_other => {}
}
}
// TODO: extend the list of the forwarded startup parameters.
// Currently, tokio-postgres doesn't allow us to pass
// arbitrary parameters, but the ones above are a good start.
//
// This and the reverse params problem can be better addressed
// in a bespoke connection machinery (a new library for that sake).
// TODO: establish a secure connection to the DB.
let (socket_addr, mut stream) = self.connect_raw().await?;
let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?;
@@ -203,13 +189,8 @@ impl ConnCfg {
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
let db = PostgresConnection { stream, params };
let connection = PostgresConnection {
stream,
params,
cancel_closure,
};
Ok(connection)
Ok((db, cancel_closure))
}
}

View File

@@ -1,16 +1,16 @@
use crate::auth;
use anyhow::{bail, ensure, Context};
use std::{str::FromStr, sync::Arc, time::Duration};
use anyhow::{ensure, Context};
use std::sync::Arc;
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub auth_backend: auth::BackendType<'static, ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub metric_collection_config: Option<MetricCollectionConfig>,
}
pub struct MetricCollectionConfig {
pub endpoint: reqwest::Url,
pub interval: Duration,
pub interval: std::time::Duration,
}
pub struct TlsConfig {
@@ -37,7 +37,6 @@ pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfi
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.context(format!(
@@ -74,80 +73,3 @@ pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfi
common_name,
})
}
/// Helper for cmdline cache options parsing.
pub struct CacheOptions {
/// Max number of entries.
pub size: usize,
/// Entry's time-to-live.
pub ttl: Duration,
}
impl CacheOptions {
/// Default options for [`crate::auth::caches::NodeInfoCache`].
pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=5m";
/// Parse cache options passed via cmdline.
/// Example: [`Self::DEFAULT_OPTIONS_NODE_INFO`].
fn parse(options: &str) -> anyhow::Result<Self> {
let mut size = None;
let mut ttl = None;
for option in options.split(',') {
let (key, value) = option
.split_once('=')
.with_context(|| format!("bad key-value pair: {option}"))?;
match key {
"size" => size = Some(value.parse()?),
"ttl" => ttl = Some(humantime::parse_duration(value)?),
unknown => bail!("unknown key: {unknown}"),
}
}
// TTL doesn't matter if cache is always empty.
if let Some(0) = size {
ttl.get_or_insert(Duration::default());
}
Ok(Self {
size: size.context("missing `size`")?,
ttl: ttl.context("missing `ttl`")?,
})
}
}
impl FromStr for CacheOptions {
type Err = anyhow::Error;
fn from_str(options: &str) -> Result<Self, Self::Err> {
let error = || format!("failed to parse cache options '{options}'");
Self::parse(options).with_context(error)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_cache_options() -> anyhow::Result<()> {
let CacheOptions { size, ttl } = "size=4096,ttl=5min".parse()?;
assert_eq!(size, 4096);
assert_eq!(ttl, Duration::from_secs(5 * 60));
let CacheOptions { size, ttl } = "ttl=4m,size=2".parse()?;
assert_eq!(size, 2);
assert_eq!(ttl, Duration::from_secs(4 * 60));
let CacheOptions { size, ttl } = "size=0,ttl=1s".parse()?;
assert_eq!(size, 0);
assert_eq!(ttl, Duration::from_secs(1));
let CacheOptions { size, ttl } = "size=0".parse()?;
assert_eq!(size, 0);
assert_eq!(ttl, Duration::default());
Ok(())
}
}

View File

@@ -3,15 +3,3 @@
/// Payloads used in the console's APIs.
pub mod messages;
/// Wrappers for console APIs and their mocks.
pub mod provider;
pub use provider::{errors, Api, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo};
/// Various cache-related types.
pub mod caches {
pub use super::provider::{ApiCaches, NodeInfoCache};
}
/// Console's management API.
pub mod mgmt;

View File

@@ -63,13 +63,13 @@ impl KickSession<'_> {
/// Compute node connection params.
#[derive(Deserialize)]
pub struct DatabaseInfo {
pub host: Box<str>,
pub host: String,
pub port: u16,
pub dbname: Box<str>,
pub user: Box<str>,
pub dbname: String,
pub user: String,
/// Console always provides a password, but it might
/// be inconvenient for debug with local PG instance.
pub password: Option<Box<str>>,
pub password: Option<String>,
pub aux: MetricsAuxInfo,
}

View File

@@ -1,194 +0,0 @@
pub mod mock;
pub mod neon;
use super::messages::MetricsAuxInfo;
use crate::{
auth::ClientCredentials,
cache::{timed_lru, TimedLru},
compute, scram,
};
use async_trait::async_trait;
use std::sync::Arc;
pub mod errors {
use crate::error::{io_error, UserFacingError};
use reqwest::StatusCode as HttpStatusCode;
use thiserror::Error;
/// A go-to error message which doesn't leak any detail.
const REQUEST_FAILED: &str = "Console request failed";
/// Common console API error.
#[derive(Debug, Error)]
pub enum ApiError {
/// Error returned by the console itself.
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
Console {
status: HttpStatusCode,
text: Box<str>,
},
/// Various IO errors like broken pipe or malformed payload.
#[error("{REQUEST_FAILED}: {0}")]
Transport(#[from] std::io::Error),
}
impl ApiError {
/// Returns HTTP status code if it's the reason for failure.
pub fn http_status_code(&self) -> Option<HttpStatusCode> {
use ApiError::*;
match self {
Console { status, .. } => Some(*status),
_ => None,
}
}
}
impl UserFacingError for ApiError {
fn to_string_client(&self) -> String {
use ApiError::*;
match self {
// To minimize risks, only select errors are forwarded to users.
// Ask @neondatabase/control-plane for review before adding more.
Console { status, .. } => match *status {
HttpStatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
HttpStatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
HttpStatusCode::LOCKED => {
// Status 423: project might be in maintenance mode (or bad state).
format!("{REQUEST_FAILED}: endpoint is temporary unavailable")
}
_ => REQUEST_FAILED.to_owned(),
},
_ => REQUEST_FAILED.to_owned(),
}
}
}
// Helps eliminate graceless `.map_err` calls without introducing another ctor.
impl From<reqwest::Error> for ApiError {
fn from(e: reqwest::Error) -> Self {
io_error(e).into()
}
}
#[derive(Debug, Error)]
pub enum GetAuthInfoError {
// We shouldn't include the actual secret here.
#[error("Console responded with a malformed auth secret")]
BadSecret,
#[error(transparent)]
ApiError(ApiError),
}
// This allows more useful interactions than `#[from]`.
impl<E: Into<ApiError>> From<E> for GetAuthInfoError {
fn from(e: E) -> Self {
Self::ApiError(e.into())
}
}
impl UserFacingError for GetAuthInfoError {
fn to_string_client(&self) -> String {
use GetAuthInfoError::*;
match self {
// We absolutely should not leak any secrets!
BadSecret => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
}
}
}
#[derive(Debug, Error)]
pub enum WakeComputeError {
#[error("Console responded with a malformed compute address: {0}")]
BadComputeAddress(Box<str>),
#[error(transparent)]
ApiError(ApiError),
}
// This allows more useful interactions than `#[from]`.
impl<E: Into<ApiError>> From<E> for WakeComputeError {
fn from(e: E) -> Self {
Self::ApiError(e.into())
}
}
impl UserFacingError for WakeComputeError {
fn to_string_client(&self) -> String {
use WakeComputeError::*;
match self {
// We shouldn't show user the address even if it's broken.
// Besides, user is unlikely to care about this detail.
BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
}
}
}
}
/// Extra query params we'd like to pass to the console.
pub struct ConsoleReqExtra<'a> {
/// A unique identifier for a connection.
pub session_id: uuid::Uuid,
/// Name of client application, if set.
pub application_name: Option<&'a str>,
}
/// Auth secret which is managed by the cloud.
pub enum AuthInfo {
/// Md5 hash of user's password.
Md5([u8; 16]),
/// [SCRAM](crate::scram) authentication info.
Scram(scram::ServerSecret),
}
/// Info for establishing a connection to a compute node.
/// This is what we get after auth succeeded, but not before!
#[derive(Clone)]
pub struct NodeInfo {
/// Compute node connection params.
/// It's sad that we have to clone this, but this will improve
/// once we migrate to a bespoke connection logic.
pub config: compute::ConnCfg,
/// Labels for proxy's metrics.
pub aux: Arc<MetricsAuxInfo>,
}
pub type NodeInfoCache = TimedLru<Arc<str>, NodeInfo>;
pub type CachedNodeInfo = timed_lru::Cached<&'static NodeInfoCache>;
/// This will allocate per each call, but the http requests alone
/// already require a few allocations, so it should be fine.
#[async_trait]
pub trait Api {
/// Get the client's auth secret for authentication.
async fn get_auth_info(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}
/// Various caches for [`console`].
pub struct ApiCaches {
/// Cache for the `wake_compute` API method.
pub node_info: NodeInfoCache,
}

View File

@@ -1,196 +0,0 @@
//! Production console backend.
use super::{
super::messages::{ConsoleError, GetRoleSecret, WakeCompute},
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::{auth::ClientCredentials, compute, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
use reqwest::StatusCode as HttpStatusCode;
use tracing::{error, info, info_span, warn, Instrument};
#[derive(Clone)]
pub struct Api {
endpoint: http::Endpoint,
caches: &'static ApiCaches,
}
impl Api {
/// Construct an API object containing the auth parameters.
pub fn new(endpoint: http::Endpoint, caches: &'static ApiCaches) -> Self {
Self { endpoint, caches }
}
pub fn url(&self) -> &str {
self.endpoint.url().as_str()
}
async fn do_get_auth_info(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, GetAuthInfoError> {
let request_id = uuid::Uuid::new_v4().to_string();
async {
let request = self
.endpoint
.get("proxy_get_role_secret")
.header("X-Request-ID", &request_id)
.query(&[("session_id", extra.session_id)])
.query(&[
("application_name", extra.application_name),
("project", Some(creds.project().expect("impossible"))),
("role", Some(creds.user)),
])
.build()?;
info!(url = request.url().as_str(), "sending http request");
let response = self.endpoint.execute(request).await?;
let body = match parse_body::<GetRoleSecret>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
Err(e) => match e.http_status_code() {
Some(HttpStatusCode::NOT_FOUND) => return Ok(None),
_otherwise => return Err(e.into()),
},
};
let secret = scram::ServerSecret::parse(&body.role_secret)
.map(AuthInfo::Scram)
.ok_or(GetAuthInfoError::BadSecret)?;
Ok(Some(secret))
}
.map_err(crate::error::log_error)
.instrument(info_span!("http", id = request_id))
.await
}
async fn do_wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<NodeInfo, WakeComputeError> {
let project = creds.project().expect("impossible");
let request_id = uuid::Uuid::new_v4().to_string();
async {
let request = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", &request_id)
.query(&[("session_id", extra.session_id)])
.query(&[
("application_name", extra.application_name),
("project", Some(project)),
])
.build()?;
info!(url = request.url().as_str(), "sending http request");
let response = self.endpoint.execute(request).await?;
let body = parse_body::<WakeCompute>(response).await?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&body.address) {
None => return Err(WakeComputeError::BadComputeAddress(body.address)),
Some(x) => x,
};
let mut config = compute::ConnCfg::new();
config
.host(host)
.port(port)
.dbname(creds.dbname)
.user(creds.user);
let node = NodeInfo {
config,
aux: body.aux.into(),
};
Ok(node)
}
.map_err(crate::error::log_error)
.instrument(info_span!("http", id = request_id))
.await
}
}
#[async_trait]
impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_auth_info(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, GetAuthInfoError> {
self.do_get_auth_info(extra, creds).await
}
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = creds.project().expect("impossible");
// Every time we do a wakeup http request, the compute node will stay up
// for some time (highly depends on the console's scale-to-zero policy);
// The connection info remains the same during that period of time,
// which means that we might cache it to reduce the load and latency.
if let Some(cached) = self.caches.node_info.get(key) {
info!(key = key, "found cached compute node info");
return Ok(cached);
}
let node = self.do_wake_compute(extra, creds).await?;
let (_, cached) = self.caches.node_info.insert(key.into(), node);
info!(key = key, "created a cache entry for compute node info");
Ok(cached)
}
}
/// Parse http response body, taking status code into account.
async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
response: reqwest::Response,
) -> Result<T, ApiError> {
let status = response.status();
if status.is_success() {
// We shouldn't log raw body because it may contain secrets.
info!("request succeeded, processing the body");
return Ok(response.json().await?);
}
// Don't throw an error here because it's not as important
// as the fact that the request itself has failed.
let body = response.json().await.unwrap_or_else(|e| {
warn!("failed to parse error body: {e}");
ConsoleError {
error: "reason unclear (malformed error message)".into(),
}
});
let text = body.error;
error!("console responded with an error ({status}): {text}");
Err(ApiError::Console { status, text })
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.split_once(':')?;
Some((host, port.parse().ok()?))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_host_port() {
let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 5432);
}
}

View File

@@ -9,7 +9,8 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
}
fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router().get("/v1/status", status_handler)
let router = endpoint::make_router();
router.get("/v1/status", status_handler)
}
pub async fn task_main(http_listener: TcpListener) -> anyhow::Result<()> {

View File

@@ -1,6 +1,6 @@
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
use hyper::server::accept;
use hyper::server::accept::{self};
use hyper::server::conn::AddrIncoming;
use hyper::upgrade::Upgraded;
use hyper::{Body, Request, Response, StatusCode};
@@ -161,7 +161,7 @@ impl AsyncBufRead for WebSocketRW {
async fn serve_websocket(
websocket: HyperWebsocket,
config: &'static ProxyConfig,
config: &ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
hostname: Option<String>,

View File

@@ -5,7 +5,6 @@
//! in somewhat transparent manner (again via communication with control plane API).
mod auth;
mod cache;
mod cancellation;
mod compute;
mod config;
@@ -13,6 +12,7 @@ mod console;
mod error;
mod http;
mod metrics;
mod mgmt;
mod parse;
mod proxy;
mod sasl;
@@ -21,6 +21,7 @@ mod stream;
mod url;
mod waiters;
use ::metrics::set_build_info_metric;
use anyhow::{bail, Context};
use clap::{self, Arg};
use config::ProxyConfig;
@@ -28,7 +29,8 @@ use futures::FutureExt;
use std::{borrow::Cow, future::Future, net::SocketAddr};
use tokio::{net::TcpListener, task::JoinError};
use tracing::{info, info_span, Instrument};
use utils::{project_git_version, sentry_init::init_sentry};
use utils::project_git_version;
use utils::sentry_init::init_sentry;
project_git_version!(GIT_VERSION);
@@ -49,131 +51,122 @@ async fn main() -> anyhow::Result<()> {
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("Version: {GIT_VERSION}");
::metrics::set_build_info_metric(GIT_VERSION);
let arg_matches = cli().get_matches();
let args = cli().get_matches();
let config = build_config(&args)?;
info!("Authentication backend: {}", config.auth_backend);
// Check that we can bind to address before further initialization
let http_address: SocketAddr = args.get_one::<String>("http").unwrap().parse()?;
info!("Starting http on {http_address}");
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
let mgmt_address: SocketAddr = args.get_one::<String>("mgmt").unwrap().parse()?;
info!("Starting mgmt on {mgmt_address}");
let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?;
let proxy_address: SocketAddr = args.get_one::<String>("proxy").unwrap().parse()?;
info!("Starting proxy on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let mut tasks = vec![
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(proxy::task_main(config, proxy_listener)),
tokio::task::spawn_blocking(move || console::mgmt::thread_main(mgmt_listener)),
];
if let Some(wss_address) = args.get_one::<String>("wss") {
let wss_address: SocketAddr = wss_address.parse()?;
info!("Starting wss on {wss_address}");
let wss_listener = TcpListener::bind(wss_address).await?;
tasks.push(tokio::spawn(http::websocket::task_main(
wss_listener,
config,
)));
}
// TODO: refactor.
if let Some(metric_collection) = &config.metric_collection {
let hostname = hostname::get()?
.into_string()
.map_err(|e| anyhow::anyhow!("failed to get hostname {e:?}"))?;
tasks.push(tokio::spawn(
metrics::collect_metrics(
&metric_collection.endpoint,
metric_collection.interval,
hostname,
)
.instrument(info_span!("collect_metrics")),
));
}
// This will block until all tasks have completed.
// Furthermore, the first one to fail will cancel the rest.
let tasks = tasks.into_iter().map(flatten_err);
let _: Vec<()> = futures::future::try_join_all(tasks).await?;
Ok(())
}
/// ProxyConfig is created at proxy startup, and lives forever.
fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> {
let tls_config = match (
args.get_one::<String>("tls-key"),
args.get_one::<String>("tls-cert"),
arg_matches.get_one::<String>("tls-key"),
arg_matches.get_one::<String>("tls-cert"),
) {
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(key_path, cert_path)?),
(None, None) => None,
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
let metric_collection = match (
args.get_one::<String>("metric-collection-endpoint"),
args.get_one::<String>("metric-collection-interval"),
let proxy_address: SocketAddr = arg_matches.get_one::<String>("proxy").unwrap().parse()?;
let mgmt_address: SocketAddr = arg_matches.get_one::<String>("mgmt").unwrap().parse()?;
let http_address: SocketAddr = arg_matches.get_one::<String>("http").unwrap().parse()?;
let metric_collection_config = match
(
arg_matches.get_one::<String>("metric-collection-endpoint"),
arg_matches.get_one::<String>("metric-collection-interval"),
) {
(Some(endpoint), Some(interval)) => Some(config::MetricCollectionConfig {
endpoint: endpoint.parse()?,
interval: humantime::parse_duration(interval)?,
}),
(Some(endpoint), Some(interval)) => {
Some(config::MetricCollectionConfig {
endpoint: endpoint.parse()?,
interval: humantime::parse_duration(interval)?,
})
}
(None, None) => None,
_ => bail!(
"either both or neither metric-collection-endpoint \
and metric-collection-interval must be specified"
),
_ => bail!("either both or neither metric-collection-endpoint and metric-collection-interval must be specified"),
};
let auth_backend = match args.get_one::<String>("auth-backend").unwrap().as_str() {
let auth_backend = match arg_matches
.get_one::<String>("auth-backend")
.unwrap()
.as_str()
{
"console" => {
let config::CacheOptions { size, ttl } = args
.get_one::<String>("wake-compute-cache")
let url = arg_matches
.get_one::<String>("auth-endpoint")
.unwrap()
.parse()?;
info!("Using NodeInfoCache (wake_compute) with size={size} ttl={ttl:?}");
let caches = Box::leak(Box::new(console::caches::ApiCaches {
node_info: console::caches::NodeInfoCache::new("node_info_cache", size, ttl),
}));
let url = args.get_one::<String>("auth-endpoint").unwrap().parse()?;
let endpoint = http::Endpoint::new(url, reqwest::Client::new());
let api = console::provider::neon::Api::new(endpoint, caches);
auth::BackendType::Console(Cow::Owned(api), ())
auth::BackendType::Console(Cow::Owned(endpoint), ())
}
"postgres" => {
let url = args.get_one::<String>("auth-endpoint").unwrap().parse()?;
let api = console::provider::mock::Api::new(url);
auth::BackendType::Postgres(Cow::Owned(api), ())
let url = arg_matches
.get_one::<String>("auth-endpoint")
.unwrap()
.parse()?;
auth::BackendType::Postgres(Cow::Owned(url), ())
}
"link" => {
let url = args.get_one::<String>("uri").unwrap().parse()?;
let url = arg_matches.get_one::<String>("uri").unwrap().parse()?;
auth::BackendType::Link(Cow::Owned(url))
}
other => bail!("unsupported auth backend: {other}"),
};
let config = Box::leak(Box::new(ProxyConfig {
let config: &ProxyConfig = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
metric_collection_config,
}));
Ok(config)
info!("Version: {GIT_VERSION}");
info!("Authentication backend: {}", config.auth_backend);
// Check that we can bind to address before further initialization
info!("Starting http on {http_address}");
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
info!("Starting mgmt on {mgmt_address}");
let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?;
info!("Starting proxy on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let mut tasks = vec![
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(proxy::task_main(config, proxy_listener)),
tokio::task::spawn_blocking(move || mgmt::thread_main(mgmt_listener)),
];
if let Some(wss_address) = arg_matches.get_one::<String>("wss") {
let wss_address: SocketAddr = wss_address.parse()?;
info!("Starting wss on {}", wss_address);
let wss_listener = TcpListener::bind(wss_address).await?;
tasks.push(tokio::spawn(http::websocket::task_main(
wss_listener,
config,
)));
}
if let Some(metric_collection_config) = &config.metric_collection_config {
let hostname = hostname::get()?
.into_string()
.map_err(|e| anyhow::anyhow!("failed to get hostname {e:?}"))?;
tasks.push(tokio::spawn(
metrics::collect_metrics(
&metric_collection_config.endpoint,
metric_collection_config.interval,
hostname,
)
.instrument(info_span!("collect_metrics")),
));
}
let tasks = tasks.into_iter().map(flatten_err);
set_build_info_metric(GIT_VERSION);
// This will block until all tasks have completed.
// Furthermore, the first one to fail will cancel the rest.
let _: Vec<()> = futures::future::try_join_all(tasks).await?;
Ok(())
}
fn cli() -> clap::Command {
@@ -242,27 +235,16 @@ fn cli() -> clap::Command {
.arg(
Arg::new("metric-collection-endpoint")
.long("metric-collection-endpoint")
.help("http endpoint to receive periodic metric updates"),
.help("metric collection HTTP endpoint"),
)
.arg(
Arg::new("metric-collection-interval")
.long("metric-collection-interval")
.help("how often metrics should be sent to a collection endpoint"),
)
.arg(
Arg::new("wake-compute-cache")
.long("wake-compute-cache")
.help("cache for `wake_compute` api method (use `size=0` to disable)")
.default_value(config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO),
.help("metric collection interval"),
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn verify_cli() {
cli().debug_assert();
}
#[test]
fn verify_cli() {
cli().debug_assert();
}

View File

@@ -1,9 +1,8 @@
use crate::{
auth,
console::messages::{DatabaseInfo, KickSession},
waiters::{self, Waiter, Waiters},
};
use anyhow::Context;
use once_cell::sync::Lazy;
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use std::{
net::{TcpListener, TcpStream},
@@ -15,25 +14,6 @@ use utils::{
postgres_backend_async::QueryError,
};
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
/// Give caller an opportunity to wait for the cloud's reply.
pub async fn with_waiter<R, T, E>(
psql_session_id: impl Into<String>,
action: impl FnOnce(Waiter<'static, ComputeReady>) -> R,
) -> Result<T, E>
where
R: std::future::Future<Output = Result<T, E>>,
E: From<waiters::RegisterError>,
{
let waiter = CPLANE_WAITERS.register(psql_session_id.into())?;
action(waiter).await
}
pub fn notify(psql_session_id: &str, msg: ComputeReady) -> Result<(), waiters::NotifyError> {
CPLANE_WAITERS.notify(psql_session_id, msg)
}
/// Console management API listener thread.
/// It spawns console response handlers needed for the link auth.
pub fn thread_main(listener: TcpListener) -> anyhow::Result<()> {
@@ -96,7 +76,7 @@ fn try_process_query(pgb: &mut PostgresBackend, query: &str) -> Result<(), Query
let _enter = span.enter();
info!("got response: {:?}", resp.result);
match notify(resp.session_id, Ok(resp.result)) {
match auth::backend::notify(resp.session_id, Ok(resp.result)) {
Ok(()) => {
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::DataRow(&[Some(b"ok")]))?

View File

@@ -2,12 +2,9 @@
mod tests;
use crate::{
auth::{self, backend::AuthSuccess},
auth,
cancellation::{self, CancelMap},
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, messages::MetricsAuxInfo},
error::io_error,
stream::{MeasuredStream, PqStream, Stream},
};
use anyhow::{bail, Context};
@@ -17,10 +14,7 @@ use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, info_span, warn, Instrument};
/// Number of times we should retry the `/proxy_wake_compute` http request.
const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
use tracing::{error, info, info_span, Instrument};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
@@ -41,15 +35,6 @@ static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});
static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_connection_failures_total",
"Number of connection failures (per kind).",
&["kind"],
)
.unwrap()
});
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_io_bytes_per_client",
@@ -97,12 +82,11 @@ pub async fn task_main(
}
}
// TODO(tech debt): unite this with its twin below.
pub async fn handle_ws_client(
config: &'static ProxyConfig,
config: &ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin,
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
hostname: Option<String>,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
@@ -115,7 +99,7 @@ pub async fn handle_ws_client(
let hostname = hostname.as_deref();
// TLS is None here, because the connection is already encrypted.
let do_handshake = handshake(stream, None, cancel_map);
let do_handshake = handshake(stream, None, cancel_map).instrument(info_span!("handshake"));
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
@@ -140,10 +124,10 @@ pub async fn handle_ws_client(
}
async fn handle_client(
config: &'static ProxyConfig,
config: &ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin,
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
@@ -152,7 +136,7 @@ async fn handle_client(
}
let tls = config.tls_config.as_ref();
let do_handshake = handshake(stream, tls, cancel_map);
let do_handshake = handshake(stream, tls, cancel_map).instrument(info_span!("handshake"));
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
@@ -181,7 +165,6 @@ async fn handle_client(
/// For better testing experience, `stream` can be any object satisfying the traits.
/// It's easier to work with owned `stream` here as we need to upgrade it to TLS;
/// we also take an extra care of propagating only the select handshake errors to client.
#[tracing::instrument(skip_all)]
async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
mut tls: Option<&TlsConfig>,
@@ -243,133 +226,6 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
) -> Result<PostgresConnection, compute::ConnectionError> {
// If we couldn't connect, a cached connection info might be to blame
// (e.g. the compute node's address might've changed at the wrong time).
// Invalidate the cache entry (if any) to prevent subsequent errors.
let invalidate_cache = |_: &compute::ConnectionError| {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
};
node_info
.config
.connect()
.inspect_err(invalidate_cache)
.await
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
async fn connect_to_compute(
node_info: &mut console::CachedNodeInfo,
params: &StartupMessageParams,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<PostgresConnection, compute::ConnectionError> {
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
loop {
// Apply startup params to the (possibly, cached) compute node info.
node_info.config.set_startup_params(params);
match connect_to_compute_once(node_info).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(extra).map_err(io_error).await? {
// Update `node_info` and try one more time.
Some(mut new) => {
new.config.reuse_password(&node_info.config);
*node_info = new;
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
}
}
other => return other,
}
num_retries -= 1;
info!("retrying after wake-up ({num_retries} attempts left)");
}
}
/// Finish client connection initialization: confirm auth success, send params, etc.
#[tracing::instrument(skip_all)]
async fn prepare_client_connection(
node: &compute::PostgresConnection,
reported_auth_ok: bool,
session: cancellation::Session<'_>,
stream: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> anyhow::Result<()> {
// Register compute's query cancellation token and produce a new, unique one.
// The new token (cancel_key_data) will be sent to the client.
let cancel_key_data = session.enable_query_cancellation(node.cancel_closure.clone());
// Report authentication success if we haven't done this already.
// Note that we do this only (for the most part) after we've connected
// to a compute (see above) which performs its own authentication.
if !reported_auth_ok {
stream.write_message_noflush(&Be::AuthenticationOk)?;
}
// Forward all postgres connection params to the client.
// Right now the implementation is very hacky and inefficent (ideally,
// we don't need an intermediate hashmap), but at least it should be correct.
for (name, value) in &node.params {
// TODO: Theoretically, this could result in a big pile of params...
stream.write_message_noflush(&Be::ParameterStatus {
name: name.as_bytes(),
value: value.as_bytes(),
})?;
}
stream
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
.write_message(&Be::ReadyForQuery)
.await?;
Ok(())
}
/// Forward bytes in both directions (client <-> compute).
#[tracing::instrument(skip_all)]
async fn proxy_pass(
client: impl AsyncRead + AsyncWrite + Unpin,
compute: impl AsyncRead + AsyncWrite + Unpin,
aux: &MetricsAuxInfo,
) -> anyhow::Result<()> {
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&aux.traffic_labels("tx"));
let mut client = MeasuredStream::new(client, |cnt| {
// Number of bytes we sent to the client (outbound).
m_sent.inc_by(cnt as u64);
});
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&aux.traffic_labels("rx"));
let mut compute = MeasuredStream::new(compute, |cnt| {
// Number of bytes the client sent to the compute node (inbound).
m_recv.inc_by(cnt as u64);
});
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let _ = tokio::io::copy_bidirectional(&mut client, &mut compute).await?;
Ok(())
}
/// Thin connection context.
struct Client<'a, S> {
/// The underlying libpq protocol stream.
@@ -399,17 +255,17 @@ impl<'a, S> Client<'a, S> {
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
/// Let the client authenticate and connect to the designated compute node.
async fn connect_to_db(self, session: cancellation::Session<'_>) -> anyhow::Result<()> {
let Self {
mut stream,
mut creds,
creds,
params,
session_id,
} = self;
let extra = console::ConsoleReqExtra {
let extra = auth::ConsoleReqExtra {
session_id, // aka this connection's id
application_name: params.get("application_name"),
};
@@ -422,16 +278,54 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.instrument(info_span!("auth"))
.await?;
let AuthSuccess {
reported_auth_ok,
value: mut node_info,
} = auth_result;
let node = connect_to_compute(&mut node_info, params, &extra, &creds)
let node = auth_result.value;
let (db, cancel_closure) = node
.config
.connect(params)
.or_else(|e| stream.throw_error(e))
.await?;
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
proxy_pass(stream.into_inner(), node.stream, &node_info.aux).await
let cancel_key_data = session.enable_query_cancellation(cancel_closure);
// Report authentication success if we haven't done this already.
// Note that we do this only (for the most part) after we've connected
// to a compute (see above) which performs its own authentication.
if !auth_result.reported_auth_ok {
stream.write_message_noflush(&Be::AuthenticationOk)?;
}
// Forward all postgres connection params to the client.
// Right now the implementation is very hacky and inefficent (ideally,
// we don't need an intermediate hashmap), but at least it should be correct.
for (name, value) in &db.params {
// TODO: Theoretically, this could result in a big pile of params...
stream.write_message_noflush(&Be::ParameterStatus {
name: name.as_bytes(),
value: value.as_bytes(),
})?;
}
stream
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
.write_message(&Be::ReadyForQuery)
.await?;
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("tx"));
let mut client = MeasuredStream::new(stream.into_inner(), |cnt| {
// Number of bytes we sent to the client (outbound).
m_sent.inc_by(cnt as u64);
});
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("rx"));
let mut db = MeasuredStream::new(db.stream, |cnt| {
// Number of bytes the client sent to the compute node (inbound).
m_recv.inc_by(cnt as u64);
});
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
Ok(())
}
}

View File

@@ -33,7 +33,7 @@ psutil = "^5.9.4"
types-psutil = "^5.9.5.4"
types-toml = "^0.10.8"
pytest-httpserver = "^1.0.6"
aiohttp = "3.7.4"
aiohttp = "3.7"
[tool.poetry.dev-dependencies]
flake8 = "^5.0.4"

View File

@@ -293,7 +293,7 @@ class NeonPageserverHttpClient(requests.Session):
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=true"
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1"
)
self.verbose_error(res)
res_json = res.json()

View File

@@ -4,10 +4,6 @@
# It is intended to be a primary endpoint for all the people who want to
# just setup test environment without going into details of python package management
# on ubuntu 22.04 based `six` fails to install without this, because something imports keyring.
# null keyring is fine, which means no secrets should be accessed.
export PYTHON_KEYRING_BACKEND=keyring.backends.null.Keyring
poetry config --list
if [ -z "${CI}" ]; then

View File

@@ -1232,9 +1232,9 @@ class PageserverHttpClient(requests.Session):
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "true"
params["include-non-incremental-logical-size"] = "yes"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
params["include-timeline-dir-layer-file-size-sum"] = "yes"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", params=params
@@ -1276,9 +1276,9 @@ class PageserverHttpClient(requests.Session):
) -> Dict[Any, Any]:
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "true"
params["include-non-incremental-logical-size"] = "yes"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
params["include-timeline-dir-layer-file-size-sum"] = "yes"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",

View File

@@ -35,11 +35,17 @@ from pytest_lazyfixture import lazy_fixture # type: ignore
],
)
def test_seqscans(env: PgCompare, scale: int, rows: int, iters: int, workers: int):
rows = scale * rows
rows = scale * rows * 10
with closing(env.pg.connect(options="-cstatement_timeout=0")) as conn:
with conn.cursor() as cur:
cur.execute("set enable_seqscan_prefetch=on")
cur.execute("set max_parallel_workers_per_gather=0")
cur.execute("drop table if exists t;")
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{rows}));")
@@ -62,4 +68,11 @@ def test_seqscans(env: PgCompare, scale: int, rows: int, iters: int, workers: in
with env.record_duration("run"):
for i in range(iters):
cur.execute("select count(*) from t;")
cur.execute("explain (analyze, prefetch) select count(*) from t;")
result = cur.fetchall()
prefetch_stats = result[1][0]
import re
regex = re.compile(r'([\S]+)=(\S+)')
parsed = dict(regex.findall(prefetch_stats))
print("FFFF", parsed)

View File

@@ -25,7 +25,6 @@ futures-channel = { version = "0.3", features = ["sink"] }
futures-executor = { version = "0.3" }
futures-task = { version = "0.3", default-features = false, features = ["std"] }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
@@ -59,7 +58,6 @@ url = { version = "2", features = ["serde"] }
anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
either = { version = "1" }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }