mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
14
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
14
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
@@ -35,6 +35,8 @@ storage:
|
||||
hosts:
|
||||
pageserver-0.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-01d496c5041c7f34c
|
||||
pageserver-1.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-0e8013e239ce3928c
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
@@ -44,3 +46,15 @@ storage:
|
||||
ansible_host: i-06969ee1bf2958bfc
|
||||
safekeeper-2.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-087892e9625984a0b
|
||||
safekeeper-3.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-0a6f91660e99e8891
|
||||
safekeeper-4.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-0012e309e28e7c249
|
||||
safekeeper-5.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-085a2b1193287b32e
|
||||
safekeeper-6.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-0c713248465ed0fbd
|
||||
safekeeper-7.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-02ad231aed2a80b7a
|
||||
safekeeper-8.eu-west-1.aws.neon.build:
|
||||
ansible_host: i-0dbbd8ffef66efda8
|
||||
|
||||
19
.github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml
vendored
Normal file
19
.github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
useCertManager: true
|
||||
|
||||
replicaCount: 3
|
||||
|
||||
exposedService:
|
||||
# exposedService.port -- Exposed Service proxy port
|
||||
port: 4432
|
||||
annotations:
|
||||
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.alpha.eu-central-1.internal.aws.neon.build"
|
||||
|
||||
settings:
|
||||
domain: "*.snirouter.alpha.eu-central-1.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-neon
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
19
.github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml
vendored
Normal file
19
.github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
useCertManager: true
|
||||
|
||||
replicaCount: 3
|
||||
|
||||
exposedService:
|
||||
# exposedService.port -- Exposed Service proxy port
|
||||
port: 4432
|
||||
annotations:
|
||||
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.zeta.eu-west-1.internal.aws.neon.build"
|
||||
|
||||
settings:
|
||||
domain: "*.snirouter.zeta.eu-west-1.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-neon
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
19
.github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml
vendored
Normal file
19
.github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
useCertManager: true
|
||||
|
||||
replicaCount: 3
|
||||
|
||||
exposedService:
|
||||
# exposedService.port -- Exposed Service proxy port
|
||||
port: 4432
|
||||
annotations:
|
||||
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.beta.us-east-2.internal.aws.neon.build"
|
||||
|
||||
settings:
|
||||
domain: "*.snirouter.beta.us-east-2.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-neon
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
19
.github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml
vendored
Normal file
19
.github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
useCertManager: true
|
||||
|
||||
replicaCount: 3
|
||||
|
||||
exposedService:
|
||||
# exposedService.port -- Exposed Service proxy port
|
||||
port: 4432
|
||||
annotations:
|
||||
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.epsilon.ap-southeast-1.internal.aws.neon.tech"
|
||||
|
||||
settings:
|
||||
domain: "*.snirouter.epsilon.ap-southeast-1.internal.aws.neon.tech"
|
||||
sentryEnvironment: "production"
|
||||
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-neon
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
19
.github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml
vendored
Normal file
19
.github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
useCertManager: true
|
||||
|
||||
replicaCount: 3
|
||||
|
||||
exposedService:
|
||||
# exposedService.port -- Exposed Service proxy port
|
||||
port: 4432
|
||||
annotations:
|
||||
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.gamma.eu-central-1.internal.aws.neon.tech"
|
||||
|
||||
settings:
|
||||
domain: "*.snirouter.gamma.eu-central-1.internal.aws.neon.tech"
|
||||
sentryEnvironment: "production"
|
||||
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-neon
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
19
.github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml
vendored
Normal file
19
.github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
useCertManager: true
|
||||
|
||||
replicaCount: 3
|
||||
|
||||
exposedService:
|
||||
# exposedService.port -- Exposed Service proxy port
|
||||
port: 4432
|
||||
annotations:
|
||||
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.theta.us-east-1.internal.aws.neon.tech"
|
||||
|
||||
settings:
|
||||
domain: "*.snirouter.theta.us-east-1.internal.aws.neon.tech"
|
||||
sentryEnvironment: "production"
|
||||
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-neon
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
19
.github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml
vendored
Normal file
19
.github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
useCertManager: true
|
||||
|
||||
replicaCount: 3
|
||||
|
||||
exposedService:
|
||||
# exposedService.port -- Exposed Service proxy port
|
||||
port: 4432
|
||||
annotations:
|
||||
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.delta.us-east-2.internal.aws.neon.tech"
|
||||
|
||||
settings:
|
||||
domain: "*.snirouter.delta.us-east-2.internal.aws.neon.tech"
|
||||
sentryEnvironment: "production"
|
||||
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-neon
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
19
.github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml
vendored
Normal file
19
.github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
useCertManager: true
|
||||
|
||||
replicaCount: 3
|
||||
|
||||
exposedService:
|
||||
# exposedService.port -- Exposed Service proxy port
|
||||
port: 4432
|
||||
annotations:
|
||||
external-dns.alpha.kubernetes.io/hostname: "*.snirouter.eta.us-west-2.internal.aws.neon.tech"
|
||||
|
||||
settings:
|
||||
domain: "*.snirouter.eta.us-west-2.internal.aws.neon.tech"
|
||||
sentryEnvironment: "production"
|
||||
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-neon
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
51
.github/workflows/deploy-dev.yml
vendored
51
.github/workflows/deploy-dev.yml
vendored
@@ -27,6 +27,11 @@ on:
|
||||
required: true
|
||||
type: boolean
|
||||
default: true
|
||||
deployPgSniRouter:
|
||||
description: 'Deploy pg-sni-router'
|
||||
required: true
|
||||
type: boolean
|
||||
default: true
|
||||
|
||||
env:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
@@ -227,3 +232,49 @@ jobs:
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-pg-sni-router:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
if: inputs.deployPgSniRouter
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- target_region: us-east-2
|
||||
target_cluster: dev-us-east-2-beta
|
||||
- target_region: eu-west-1
|
||||
target_cluster: dev-eu-west-1-zeta
|
||||
- target_region: eu-central-1
|
||||
target_cluster: dev-eu-central-1-alpha
|
||||
environment:
|
||||
name: dev-${{ matrix.target_region }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
ref: ${{ inputs.branch }}
|
||||
|
||||
- name: Configure AWS Credentials
|
||||
uses: aws-actions/configure-aws-credentials@v1-node16
|
||||
with:
|
||||
role-to-assume: arn:aws:iam::369495373322:role/github-runner
|
||||
aws-region: eu-central-1
|
||||
role-skip-session-tagging: true
|
||||
role-duration-seconds: 1800
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
|
||||
|
||||
- name: Deploy pg-sni-router
|
||||
run:
|
||||
helm upgrade neon-pg-sni-router neondatabase/neon-pg-sni-router --namespace neon-pg-sni-router --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.pg-sni-router.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 15m0s
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
44
.github/workflows/deploy-prod.yml
vendored
44
.github/workflows/deploy-prod.yml
vendored
@@ -27,6 +27,11 @@ on:
|
||||
required: true
|
||||
type: boolean
|
||||
default: true
|
||||
deployPgSniRouter:
|
||||
description: 'Deploy pg-sni-router'
|
||||
required: true
|
||||
type: boolean
|
||||
default: true
|
||||
disclamerAcknowledged:
|
||||
description: 'I confirm that there is an emergency and I can not use regular release workflow'
|
||||
required: true
|
||||
@@ -171,3 +176,42 @@ jobs:
|
||||
- name: Deploy storage-broker
|
||||
run:
|
||||
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
|
||||
|
||||
deploy-pg-sni-router:
|
||||
runs-on: prod
|
||||
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
if: inputs.deployPgSniRouter && inputs.disclamerAcknowledged
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- target_region: us-east-2
|
||||
target_cluster: prod-us-east-2-delta
|
||||
- target_region: us-west-2
|
||||
target_cluster: prod-us-west-2-eta
|
||||
- target_region: eu-central-1
|
||||
target_cluster: prod-eu-central-1-gamma
|
||||
- target_region: ap-southeast-1
|
||||
target_cluster: prod-ap-southeast-1-epsilon
|
||||
- target_region: us-east-1
|
||||
target_cluster: prod-us-east-1-theta
|
||||
environment:
|
||||
name: prod-${{ matrix.target_region }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
ref: ${{ inputs.branch }}
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
|
||||
|
||||
- name: Deploy pg-sni-router
|
||||
run:
|
||||
helm upgrade neon-pg-sni-router neondatabase/neon-pg-sni-router --namespace neon-pg-sni-router --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.pg-sni-router.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 15m0s
|
||||
|
||||
119
Cargo.lock
generated
119
Cargo.lock
generated
@@ -1574,6 +1574,21 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
|
||||
dependencies = [
|
||||
"foreign-types-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types-shared"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||
|
||||
[[package]]
|
||||
name = "form_urlencoded"
|
||||
version = "1.1.0"
|
||||
@@ -2361,6 +2376,24 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "native-tls"
|
||||
version = "0.2.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"openssl",
|
||||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
"security-framework-sys",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.26.2"
|
||||
@@ -2483,12 +2516,50 @@ version = "11.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.52"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"foreign-types",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"openssl-macros",
|
||||
"openssl-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-macros"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.15",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.87"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.18.0"
|
||||
@@ -2816,6 +2887,12 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pkg-config"
|
||||
version = "0.3.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
|
||||
|
||||
[[package]]
|
||||
name = "plotters"
|
||||
version = "0.3.4"
|
||||
@@ -2847,7 +2924,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -2857,10 +2934,21 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-native-tls"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-postgres",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -2878,7 +2966,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -3109,10 +3197,12 @@ dependencies = [
|
||||
"itertools",
|
||||
"md5",
|
||||
"metrics",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"postgres-native-tls",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
"prometheus",
|
||||
@@ -3567,6 +3657,7 @@ dependencies = [
|
||||
"const_format",
|
||||
"crc32c",
|
||||
"fs2",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
@@ -3581,6 +3672,7 @@ dependencies = [
|
||||
"pq_proto",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"safekeeper_api",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3868,8 +3960,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "sharded-slab"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
|
||||
source = "git+https://github.com/neondatabase/sharded-slab.git?rev=98d16753ab01c61f0a028de44167307a00efea00#98d16753ab01c61f0a028de44167307a00efea00"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
@@ -4319,10 +4410,20 @@ dependencies = [
|
||||
"syn 2.0.15",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-native-tls"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -4914,6 +5015,12 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.4"
|
||||
|
||||
20
Cargo.toml
20
Cargo.toml
@@ -62,6 +62,7 @@ jsonwebtoken = "8"
|
||||
libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
nix = "0.26"
|
||||
notify = "5.0.0"
|
||||
num_cpus = "1.15"
|
||||
@@ -124,10 +125,11 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
|
||||
|
||||
## Other git libraries
|
||||
@@ -159,10 +161,16 @@ rstest = "0.17"
|
||||
tempfile = "3.4"
|
||||
tonic-build = "0.9"
|
||||
|
||||
[patch.crates-io]
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
[patch.crates-io]
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
|
||||
# Changes the MAX_THREADS limit from 4096 to 32768.
|
||||
# This is a temporary workaround for using tracing from many threads in safekeepers code,
|
||||
# until async safekeepers patch is merged to the main.
|
||||
sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
11
Dockerfile
11
Dockerfile
@@ -44,7 +44,15 @@ COPY --chown=nonroot . .
|
||||
# Show build caching stats to check if it was used in the end.
|
||||
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
|
||||
RUN set -e \
|
||||
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin storage_broker --bin proxy --locked --release \
|
||||
&& mold -run cargo build \
|
||||
--bin pg_sni_router \
|
||||
--bin pageserver \
|
||||
--bin pageserver_binutils \
|
||||
--bin draw_timeline_dir \
|
||||
--bin safekeeper \
|
||||
--bin storage_broker \
|
||||
--bin proxy \
|
||||
--locked --release \
|
||||
&& cachepot -s
|
||||
|
||||
# Build final image
|
||||
@@ -63,6 +71,7 @@ RUN set -e \
|
||||
&& useradd -d /data neon \
|
||||
&& chown -R neon:neon /data
|
||||
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
|
||||
|
||||
@@ -287,14 +287,33 @@ impl EvictionsWithLowResidenceDuration {
|
||||
let Some(_counter) = self.counter.take() else {
|
||||
return;
|
||||
};
|
||||
EVICTIONS_WITH_LOW_RESIDENCE_DURATION
|
||||
.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
self.data_source,
|
||||
&Self::threshold_label_value(self.threshold),
|
||||
])
|
||||
.expect("we own the metric, no-one else should remove it");
|
||||
|
||||
let threshold = Self::threshold_label_value(self.threshold);
|
||||
|
||||
let removed = EVICTIONS_WITH_LOW_RESIDENCE_DURATION.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
self.data_source,
|
||||
&threshold,
|
||||
]);
|
||||
|
||||
match removed {
|
||||
Err(e) => {
|
||||
// this has been hit in staging as
|
||||
// <https://neondatabase.sentry.io/issues/4142396994/>, but we don't know how.
|
||||
// because we can be in the drop path already, don't risk:
|
||||
// - "double-panic => illegal instruction" or
|
||||
// - future "drop panick => abort"
|
||||
//
|
||||
// so just nag: (the error has the labels)
|
||||
tracing::warn!("failed to remove EvictionsWithLowResidenceDuration, it was already removed? {e:#?}");
|
||||
}
|
||||
Ok(()) => {
|
||||
// to help identify cases where we double-remove the same values, let's log all
|
||||
// deletions?
|
||||
tracing::info!("removed EvictionsWithLowResidenceDuration with {tenant_id}, {timeline_id}, {}, {threshold}", self.data_source);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -352,7 +352,7 @@ impl PageServerHandler {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()>
|
||||
) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
@@ -398,7 +398,9 @@ impl PageServerHandler {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Terminate) => break,
|
||||
Some(m) => {
|
||||
anyhow::bail!("unexpected message: {m:?} during COPY");
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"unexpected message: {m:?} during COPY"
|
||||
)));
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
@@ -62,6 +62,8 @@ utils.workspace = true
|
||||
uuid.workspace = true
|
||||
webpki-roots.workspace = true
|
||||
x509-parser.workspace = true
|
||||
native-tls.workspace = true
|
||||
postgres-native-tls.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
tokio-util.workspace = true
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::{
|
||||
use pq_proto::BeMessage as Be;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -87,6 +88,16 @@ pub(super) async fn authenticate(
|
||||
.dbname(&db_info.dbname)
|
||||
.user(&db_info.user);
|
||||
|
||||
// Backwards compatibility. pg_sni_proxy uses "--" in domain names
|
||||
// while direct connections do not. Once we migrate to pg_sni_proxy
|
||||
// everywhere, we can remove this.
|
||||
if db_info.host.contains("--") {
|
||||
// we need TLS connection with SNI info to properly route it
|
||||
config.ssl_mode(SslMode::Require);
|
||||
} else {
|
||||
config.ssl_mode(SslMode::Disable);
|
||||
}
|
||||
|
||||
if let Some(password) = db_info.password {
|
||||
config.password(password.as_ref());
|
||||
}
|
||||
@@ -96,6 +107,7 @@ pub(super) async fn authenticate(
|
||||
value: NodeInfo {
|
||||
config,
|
||||
aux: db_info.aux.into(),
|
||||
allow_self_signed_compute: false, // caller may override
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
250
proxy/src/bin/pg_sni_router.rs
Normal file
250
proxy/src/bin/pg_sni_router.rs
Normal file
@@ -0,0 +1,250 @@
|
||||
/// A stand-alone program that routes connections, e.g. from
|
||||
/// `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`.
|
||||
///
|
||||
/// This allows connecting to pods/services running in the same Kubernetes cluster from
|
||||
/// the outside. Similar to an ingress controller for HTTPS.
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use clap::{self, Arg};
|
||||
use futures::TryFutureExt;
|
||||
use proxy::console::messages::MetricsAuxInfo;
|
||||
use proxy::stream::{PqStream, Stream};
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{project_git_version, sentry_init::init_sentry};
|
||||
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
fn cli() -> clap::Command {
|
||||
clap::Command::new("Neon proxy/router")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
Arg::new("listen")
|
||||
.short('l')
|
||||
.long("listen")
|
||||
.help("listen for incoming client connections on ip:port")
|
||||
.default_value("127.0.0.1:4432"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("tls-key")
|
||||
.short('k')
|
||||
.long("tls-key")
|
||||
.help("path to TLS key for client postgres connections")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("tls-cert")
|
||||
.short('c')
|
||||
.long("tls-cert")
|
||||
.help("path to TLS cert for client postgres connections")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("dest")
|
||||
.short('d')
|
||||
.long("destination")
|
||||
.help("append this domain zone to the SNI hostname to get the destination address")
|
||||
.required(true),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = proxy::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
let args = cli().get_matches();
|
||||
let destination: String = args.get_one::<String>("dest").unwrap().parse()?;
|
||||
|
||||
// Configure TLS
|
||||
let tls_config: Arc<rustls::ServerConfig> = match (
|
||||
args.get_one::<String>("tls-key"),
|
||||
args.get_one::<String>("tls-cert"),
|
||||
) {
|
||||
(Some(key_path), Some(cert_path)) => {
|
||||
let key = {
|
||||
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
|
||||
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
|
||||
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
|
||||
keys.pop().map(rustls::PrivateKey).unwrap()
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.context(format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
))?
|
||||
.into_iter()
|
||||
.map(rustls::Certificate)
|
||||
.collect()
|
||||
};
|
||||
|
||||
rustls::ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into()
|
||||
}
|
||||
_ => bail!("tls-key and tls-cert must be specified"),
|
||||
};
|
||||
|
||||
// Start listening for incoming client connections
|
||||
let proxy_address: SocketAddr = args.get_one::<String>("listen").unwrap().parse()?;
|
||||
info!("Starting sni router on {proxy_address}");
|
||||
let proxy_listener = TcpListener::bind(proxy_address).await?;
|
||||
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let main = proxy::flatten_err(tokio::spawn(task_main(
|
||||
Arc::new(destination),
|
||||
tls_config,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
)));
|
||||
let signals_task = proxy::flatten_err(tokio::spawn(proxy::handle_signals(cancellation_token)));
|
||||
|
||||
tokio::select! {
|
||||
res = main => { res?; },
|
||||
res = signals_task => { res?; },
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn task_main(
|
||||
dest_suffix: Arc<String>,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
// When set for the server socket, the keepalive setting
|
||||
// will be inherited by all accepted client sockets.
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let mut connections = tokio::task::JoinSet::new();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
accept_result = listener.accept() => {
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
info!("accepted postgres client connection from {peer_addr}");
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let tls_config = Arc::clone(&tls_config);
|
||||
let dest_suffix = Arc::clone(&dest_suffix);
|
||||
|
||||
connections.spawn(
|
||||
async move {
|
||||
info!("spawned a task for {peer_addr}");
|
||||
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
.context("failed to set socket option")?;
|
||||
|
||||
handle_client(dest_suffix, tls_config, session_id, socket).await
|
||||
}
|
||||
.unwrap_or_else(|e| {
|
||||
// Acknowledge that the task has finished with an error.
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
}),
|
||||
);
|
||||
}
|
||||
_ = cancellation_token.cancelled() => {
|
||||
drop(listener);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Drain connections
|
||||
info!("waiting for all client connections to finish");
|
||||
while let Some(res) = connections.join_next().await {
|
||||
if let Err(e) = res {
|
||||
if !e.is_panic() && !e.is_cancelled() {
|
||||
warn!("unexpected error from joined connection task: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("all client connections have finished");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
|
||||
async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
raw_stream: S,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
) -> anyhow::Result<Stream<S>> {
|
||||
let mut stream = PqStream::new(Stream::from_raw(raw_stream));
|
||||
|
||||
let msg = stream.read_startup_packet().await?;
|
||||
info!("received {msg:?}");
|
||||
use pq_proto::FeStartupPacket::*;
|
||||
|
||||
match msg {
|
||||
SslRequest => {
|
||||
stream
|
||||
.write_message(&pq_proto::BeMessage::EncryptionResponse(true))
|
||||
.await?;
|
||||
// Upgrade raw stream into a secure TLS-backed stream.
|
||||
// NOTE: We've consumed `tls`; this fact will be used later.
|
||||
|
||||
let (raw, read_buf) = stream.into_inner();
|
||||
// TODO: Normally, client doesn't send any data before
|
||||
// server says TLS handshake is ok and read_buf is empy.
|
||||
// However, you could imagine pipelining of postgres
|
||||
// SSLRequest + TLS ClientHello in one hunk similar to
|
||||
// pipelining in our node js driver. We should probably
|
||||
// support that by chaining read_buf with the stream.
|
||||
if !read_buf.is_empty() {
|
||||
bail!("data is sent before server replied with EncryptionResponse");
|
||||
}
|
||||
Ok(raw.upgrade(tls_config).await?)
|
||||
}
|
||||
_ => stream.throw_error_str(ERR_INSECURE_CONNECTION).await?,
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(fields(session_id = ?session_id), skip_all)]
|
||||
async fn handle_client(
|
||||
dest_suffix: Arc<String>,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
session_id: uuid::Uuid,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin,
|
||||
) -> anyhow::Result<()> {
|
||||
let tls_stream = ssl_handshake(stream, tls_config).await?;
|
||||
|
||||
// Cut off first part of the SNI domain
|
||||
// We receive required destination details in the format of
|
||||
// `{k8s_service_name}--{k8s_namespace}--{port}.non-sni-domain`
|
||||
let sni = tls_stream.sni_hostname().ok_or(anyhow!("SNI missing"))?;
|
||||
let dest: Vec<&str> = sni
|
||||
.split_once('.')
|
||||
.context("invalid SNI")?
|
||||
.0
|
||||
.splitn(3, "--")
|
||||
.collect();
|
||||
let port = dest[2].parse::<u16>().context("invalid port")?;
|
||||
let destination = format!("{}.{}.{}:{}", dest[0], dest[1], dest_suffix, port);
|
||||
|
||||
info!("destination: {}", destination);
|
||||
|
||||
let client = tokio::net::TcpStream::connect(destination).await?;
|
||||
|
||||
let metrics_aux: MetricsAuxInfo = Default::default();
|
||||
proxy::proxy::proxy_pass(tls_stream, client, &metrics_aux).await
|
||||
}
|
||||
@@ -1,49 +1,23 @@
|
||||
//! Postgres protocol proxy/router.
|
||||
//!
|
||||
//! This service listens psql port and can check auth via external service
|
||||
//! (control plane API in our case) and can create new databases and accounts
|
||||
//! in somewhat transparent manner (again via communication with control plane API).
|
||||
use proxy::auth;
|
||||
use proxy::console;
|
||||
use proxy::http;
|
||||
use proxy::metrics;
|
||||
|
||||
mod auth;
|
||||
mod cache;
|
||||
mod cancellation;
|
||||
mod compute;
|
||||
mod config;
|
||||
mod console;
|
||||
mod error;
|
||||
mod http;
|
||||
mod logging;
|
||||
mod metrics;
|
||||
mod parse;
|
||||
mod proxy;
|
||||
mod sasl;
|
||||
mod scram;
|
||||
mod stream;
|
||||
mod url;
|
||||
mod waiters;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use anyhow::bail;
|
||||
use clap::{self, Arg};
|
||||
use config::ProxyConfig;
|
||||
use futures::FutureExt;
|
||||
use std::{borrow::Cow, future::Future, net::SocketAddr};
|
||||
use tokio::{net::TcpListener, task::JoinError};
|
||||
use proxy::config::{self, ProxyConfig};
|
||||
use std::{borrow::Cow, net::SocketAddr};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use utils::{project_git_version, sentry_init::init_sentry};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
/// Flattens `Result<Result<T>>` into `Result<T>`.
|
||||
async fn flatten_err(
|
||||
f: impl Future<Output = Result<anyhow::Result<()>, JoinError>>,
|
||||
) -> anyhow::Result<()> {
|
||||
f.map(|r| r.context("join error").and_then(|x| x)).await
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = logging::init().await?;
|
||||
let _logging_guard = proxy::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
@@ -69,7 +43,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let proxy_listener = TcpListener::bind(proxy_address).await?;
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let mut client_tasks = vec![tokio::spawn(proxy::task_main(
|
||||
let mut client_tasks = vec![tokio::spawn(proxy::proxy::task_main(
|
||||
config,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
@@ -88,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
let mut tasks = vec![
|
||||
tokio::spawn(handle_signals(cancellation_token)),
|
||||
tokio::spawn(proxy::handle_signals(cancellation_token)),
|
||||
tokio::spawn(http::server::task_main(http_listener)),
|
||||
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
|
||||
];
|
||||
@@ -97,8 +71,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
tasks.push(tokio::spawn(metrics::task_main(metrics_config)));
|
||||
}
|
||||
|
||||
let tasks = futures::future::try_join_all(tasks.into_iter().map(flatten_err));
|
||||
let client_tasks = futures::future::try_join_all(client_tasks.into_iter().map(flatten_err));
|
||||
let tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err));
|
||||
let client_tasks =
|
||||
futures::future::try_join_all(client_tasks.into_iter().map(proxy::flatten_err));
|
||||
tokio::select! {
|
||||
// We are only expecting an error from these forever tasks
|
||||
res = tasks => { res?; },
|
||||
@@ -107,33 +82,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle unix signals appropriately.
|
||||
async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
let mut hangup = signal(SignalKind::hangup())?;
|
||||
let mut interrupt = signal(SignalKind::interrupt())?;
|
||||
let mut terminate = signal(SignalKind::terminate())?;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Hangup is commonly used for config reload.
|
||||
_ = hangup.recv() => {
|
||||
warn!("received SIGHUP; config reload is not supported");
|
||||
}
|
||||
// Shut down the whole application.
|
||||
_ = interrupt.recv() => {
|
||||
warn!("received SIGINT, exiting immediately");
|
||||
bail!("interrupted");
|
||||
}
|
||||
_ = terminate.recv() => {
|
||||
warn!("received SIGTERM, shutting down once all existing connections have closed");
|
||||
token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// ProxyConfig is created at proxy startup, and lives forever.
|
||||
fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> {
|
||||
let tls_config = match (
|
||||
@@ -149,6 +97,14 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
|
||||
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
|
||||
};
|
||||
|
||||
let allow_self_signed_compute: bool = args
|
||||
.get_one::<String>("allow-self-signed-compute")
|
||||
.unwrap()
|
||||
.parse()?;
|
||||
if allow_self_signed_compute {
|
||||
warn!("allowing self-signed compute certificates");
|
||||
}
|
||||
|
||||
let metric_collection = match (
|
||||
args.get_one::<String>("metric-collection-endpoint"),
|
||||
args.get_one::<String>("metric-collection-interval"),
|
||||
@@ -198,6 +154,7 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
|
||||
tls_config,
|
||||
auth_backend,
|
||||
metric_collection,
|
||||
allow_self_signed_compute,
|
||||
}));
|
||||
|
||||
Ok(config)
|
||||
@@ -288,6 +245,12 @@ fn cli() -> clap::Command {
|
||||
.help("cache for `wake_compute` api method (use `size=0` to disable)")
|
||||
.default_value(config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("allow-self-signed-compute")
|
||||
.long("allow-self-signed-compute")
|
||||
.help("Allow self-signed certificates for compute nodes (for testing)")
|
||||
.default_value("false"),
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -5,7 +5,7 @@ use pq_proto::StartupMessageParams;
|
||||
use std::{io, net::SocketAddr, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_postgres::NoTls;
|
||||
use tokio_postgres::tls::MakeTlsConnect;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
@@ -19,6 +19,9 @@ pub enum ConnectionError {
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
CouldNotConnect(#[from] io::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
TlsError(#[from] native_tls::Error),
|
||||
}
|
||||
|
||||
impl UserFacingError for ConnectionError {
|
||||
@@ -125,9 +128,15 @@ impl std::ops::DerefMut for ConnCfg {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConnCfg {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnCfg {
|
||||
/// Establish a raw TCP connection to the compute node.
|
||||
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> {
|
||||
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream, &str)> {
|
||||
use tokio_postgres::config::Host;
|
||||
|
||||
// wrap TcpStream::connect with timeout
|
||||
@@ -180,7 +189,7 @@ impl ConnCfg {
|
||||
};
|
||||
|
||||
match connect_once(host, *port).await {
|
||||
Ok(socket) => return Ok(socket),
|
||||
Ok((sockaddr, stream)) => return Ok((sockaddr, stream, host)),
|
||||
Err(err) => {
|
||||
// We can't throw an error here, as there might be more hosts to try.
|
||||
warn!("couldn't connect to compute node at {host}:{port}: {err}");
|
||||
@@ -200,7 +209,10 @@ impl ConnCfg {
|
||||
|
||||
pub struct PostgresConnection {
|
||||
/// Socket connected to a compute node.
|
||||
pub stream: TcpStream,
|
||||
pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
|
||||
tokio::net::TcpStream,
|
||||
postgres_native_tls::TlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
/// PostgreSQL connection parameters.
|
||||
pub params: std::collections::HashMap<String, String>,
|
||||
/// Query cancellation token.
|
||||
@@ -208,11 +220,27 @@ pub struct PostgresConnection {
|
||||
}
|
||||
|
||||
impl ConnCfg {
|
||||
async fn do_connect(&self) -> Result<PostgresConnection, ConnectionError> {
|
||||
// TODO: establish a secure connection to the DB.
|
||||
let (socket_addr, mut stream) = self.connect_raw().await?;
|
||||
let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?;
|
||||
info!("connected to compute node at {socket_addr}");
|
||||
async fn do_connect(
|
||||
&self,
|
||||
allow_self_signed_compute: bool,
|
||||
) -> Result<PostgresConnection, ConnectionError> {
|
||||
let (socket_addr, stream, host) = self.connect_raw().await?;
|
||||
|
||||
let tls_connector = native_tls::TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(allow_self_signed_compute)
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
|
||||
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
|
||||
|
||||
// connect_raw() will not use TLS if sslmode is "disable"
|
||||
let (client, connection) = self.0.connect_raw(stream, tls).await?;
|
||||
let stream = connection.stream.into_inner();
|
||||
|
||||
info!(
|
||||
"connected to compute node at {host} ({socket_addr}) sslmode={:?}",
|
||||
self.0.get_ssl_mode()
|
||||
);
|
||||
|
||||
// This is very ugly but as of now there's no better way to
|
||||
// extract the connection parameters from tokio-postgres' connection.
|
||||
@@ -233,8 +261,11 @@ impl ConnCfg {
|
||||
}
|
||||
|
||||
/// Connect to a corresponding compute node.
|
||||
pub async fn connect(&self) -> Result<PostgresConnection, ConnectionError> {
|
||||
self.do_connect()
|
||||
pub async fn connect(
|
||||
&self,
|
||||
allow_self_signed_compute: bool,
|
||||
) -> Result<PostgresConnection, ConnectionError> {
|
||||
self.do_connect(allow_self_signed_compute)
|
||||
.inspect_err(|err| {
|
||||
// Immediately log the error we have at our disposal.
|
||||
error!("couldn't connect to compute node: {err}");
|
||||
|
||||
@@ -12,6 +12,7 @@ pub struct ProxyConfig {
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
pub auth_backend: auth::BackendType<'static, ()>,
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
pub allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -170,6 +170,9 @@ pub struct NodeInfo {
|
||||
|
||||
/// Labels for proxy's metrics.
|
||||
pub aux: Arc<MetricsAuxInfo>,
|
||||
|
||||
/// Whether we should accept self-signed certificates (for testing)
|
||||
pub allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
pub type NodeInfoCache = TimedLru<Arc<str>, NodeInfo>;
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::{auth::ClientCredentials, compute, error::io_error, scram, url::ApiUr
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use thiserror::Error;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -86,11 +87,13 @@ impl Api {
|
||||
let mut config = compute::ConnCfg::new();
|
||||
config
|
||||
.host(self.endpoint.host_str().unwrap_or("localhost"))
|
||||
.port(self.endpoint.port().unwrap_or(5432));
|
||||
.port(self.endpoint.port().unwrap_or(5432))
|
||||
.ssl_mode(SslMode::Disable);
|
||||
|
||||
let node = NodeInfo {
|
||||
config,
|
||||
aux: Default::default(),
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
|
||||
Ok(node)
|
||||
|
||||
@@ -8,6 +8,7 @@ use super::{
|
||||
use crate::{auth::ClientCredentials, compute, http, scram};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -100,11 +101,12 @@ impl Api {
|
||||
// We'll set username and such later using the startup message.
|
||||
// TODO: add more type safety (in progress).
|
||||
let mut config = compute::ConnCfg::new();
|
||||
config.host(host).port(port);
|
||||
config.host(host).port(port).ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
|
||||
|
||||
let node = NodeInfo {
|
||||
config,
|
||||
aux: body.aux.into(),
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
|
||||
Ok(node)
|
||||
|
||||
57
proxy/src/lib.rs
Normal file
57
proxy/src/lib.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use anyhow::{bail, Context};
|
||||
use futures::{Future, FutureExt};
|
||||
use tokio::task::JoinError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
pub mod auth;
|
||||
pub mod cache;
|
||||
pub mod cancellation;
|
||||
pub mod compute;
|
||||
pub mod config;
|
||||
pub mod console;
|
||||
pub mod error;
|
||||
pub mod http;
|
||||
pub mod logging;
|
||||
pub mod metrics;
|
||||
pub mod parse;
|
||||
pub mod proxy;
|
||||
pub mod sasl;
|
||||
pub mod scram;
|
||||
pub mod stream;
|
||||
pub mod url;
|
||||
pub mod waiters;
|
||||
|
||||
/// Handle unix signals appropriately.
|
||||
pub async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
let mut hangup = signal(SignalKind::hangup())?;
|
||||
let mut interrupt = signal(SignalKind::interrupt())?;
|
||||
let mut terminate = signal(SignalKind::terminate())?;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Hangup is commonly used for config reload.
|
||||
_ = hangup.recv() => {
|
||||
warn!("received SIGHUP; config reload is not supported");
|
||||
}
|
||||
// Shut down the whole application.
|
||||
_ = interrupt.recv() => {
|
||||
warn!("received SIGINT, exiting immediately");
|
||||
bail!("interrupted");
|
||||
}
|
||||
_ = terminate.recv() => {
|
||||
warn!("received SIGTERM, shutting down once all existing connections have closed");
|
||||
token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Flattens `Result<Result<T>>` into `Result<T>`.
|
||||
pub async fn flatten_err(
|
||||
f: impl Future<Output = Result<anyhow::Result<()>, JoinError>>,
|
||||
) -> anyhow::Result<()> {
|
||||
f.map(|r| r.context("join error").and_then(|x| x)).await
|
||||
}
|
||||
@@ -155,7 +155,7 @@ pub async fn handle_ws_client(
|
||||
async { result }.or_else(|e| stream.throw_error(e)).await?
|
||||
};
|
||||
|
||||
let client = Client::new(stream, creds, ¶ms, session_id);
|
||||
let client = Client::new(stream, creds, ¶ms, session_id, false);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session, true))
|
||||
.await
|
||||
@@ -194,7 +194,15 @@ async fn handle_client(
|
||||
async { result }.or_else(|e| stream.throw_error(e)).await?
|
||||
};
|
||||
|
||||
let client = Client::new(stream, creds, ¶ms, session_id);
|
||||
let allow_self_signed_compute = config.allow_self_signed_compute;
|
||||
|
||||
let client = Client::new(
|
||||
stream,
|
||||
creds,
|
||||
¶ms,
|
||||
session_id,
|
||||
allow_self_signed_compute,
|
||||
);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session, false))
|
||||
.await
|
||||
@@ -297,9 +305,11 @@ async fn connect_to_compute_once(
|
||||
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
|
||||
};
|
||||
|
||||
let allow_self_signed_compute = node_info.allow_self_signed_compute;
|
||||
|
||||
node_info
|
||||
.config
|
||||
.connect()
|
||||
.connect(allow_self_signed_compute)
|
||||
.inspect_err(invalidate_cache)
|
||||
.await
|
||||
}
|
||||
@@ -378,7 +388,7 @@ async fn prepare_client_connection(
|
||||
|
||||
/// Forward bytes in both directions (client <-> compute).
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn proxy_pass(
|
||||
pub async fn proxy_pass(
|
||||
client: impl AsyncRead + AsyncWrite + Unpin,
|
||||
compute: impl AsyncRead + AsyncWrite + Unpin,
|
||||
aux: &MetricsAuxInfo,
|
||||
@@ -420,6 +430,8 @@ struct Client<'a, S> {
|
||||
params: &'a StartupMessageParams,
|
||||
/// Unique connection ID.
|
||||
session_id: uuid::Uuid,
|
||||
/// Allow self-signed certificates (for testing).
|
||||
allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
impl<'a, S> Client<'a, S> {
|
||||
@@ -429,12 +441,14 @@ impl<'a, S> Client<'a, S> {
|
||||
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
|
||||
params: &'a StartupMessageParams,
|
||||
session_id: uuid::Uuid,
|
||||
allow_self_signed_compute: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
creds,
|
||||
params,
|
||||
session_id,
|
||||
allow_self_signed_compute,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -451,6 +465,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
mut creds,
|
||||
params,
|
||||
session_id,
|
||||
allow_self_signed_compute,
|
||||
} = self;
|
||||
|
||||
let extra = console::ConsoleReqExtra {
|
||||
@@ -473,6 +488,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
value: mut node_info,
|
||||
} = auth_result;
|
||||
|
||||
node_info.allow_self_signed_compute = allow_self_signed_compute;
|
||||
|
||||
let mut node = connect_to_compute(&mut node_info, params, &extra, &creds)
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
@@ -19,11 +19,13 @@ git-version.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper.workspace = true
|
||||
futures.workspace = true
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_with.workspace = true
|
||||
@@ -33,6 +35,7 @@ tokio = { workspace = true, features = ["fs"] }
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
toml_edit.workspace = true
|
||||
tempfile.workspace = true
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
metrics.workspace = true
|
||||
@@ -45,6 +48,3 @@ storage_broker.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile.workspace = true
|
||||
|
||||
@@ -11,11 +11,13 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::task::JoinError;
|
||||
|
||||
use crate::debug_dump;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::{debug_dump, pull_timeline};
|
||||
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
use crate::GlobalTimelines;
|
||||
@@ -177,6 +179,49 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Pull timeline from peer safekeeper instances.
|
||||
async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let data: pull_timeline::Request = json_request(&mut request).await?;
|
||||
|
||||
let resp = pull_timeline::handle_request(data)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
/// Download a file from the timeline directory.
|
||||
// TODO: figure out a better way to copy files between safekeepers
|
||||
async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let filename: String = parse_request_param(&request, "filename")?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
let filepath = tli.timeline_dir.join(filename);
|
||||
let mut file = File::open(&filepath)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
let mut content = Vec::new();
|
||||
// TODO: don't store files in memory
|
||||
file.read_to_end(&mut content)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.body(Body::from(content))
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))
|
||||
}
|
||||
|
||||
/// Deactivates the timeline and removes its data directory.
|
||||
async fn timeline_delete_force_handler(
|
||||
mut request: Request<Body>,
|
||||
@@ -353,6 +398,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
timeline_delete_force_handler,
|
||||
)
|
||||
.delete("/v1/tenant/:tenant_id", tenant_delete_force_handler)
|
||||
.post("/v1/pull_timeline", timeline_pull_handler)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
|
||||
timeline_files_handler,
|
||||
)
|
||||
// for tests
|
||||
.post(
|
||||
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",
|
||||
|
||||
@@ -15,6 +15,7 @@ pub mod handler;
|
||||
pub mod http;
|
||||
pub mod json_ctrl;
|
||||
pub mod metrics;
|
||||
pub mod pull_timeline;
|
||||
pub mod receive_wal;
|
||||
pub mod remove_wal;
|
||||
pub mod safekeeper;
|
||||
|
||||
240
safekeeper/src/pull_timeline.rs
Normal file
240
safekeeper/src/pull_timeline.rs
Normal file
@@ -0,0 +1,240 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tracing::info;
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
|
||||
use crate::{
|
||||
control_file, debug_dump,
|
||||
http::routes::TimelineStatus,
|
||||
wal_storage::{self, Storage},
|
||||
GlobalTimelines,
|
||||
};
|
||||
|
||||
/// Info about timeline on safekeeper ready for reporting.
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Request {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub tenant_id: TenantId,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub timeline_id: TimelineId,
|
||||
pub http_hosts: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct Response {
|
||||
// Donor safekeeper host
|
||||
pub safekeeper_host: String,
|
||||
// TODO: add more fields?
|
||||
}
|
||||
|
||||
/// Find the most advanced safekeeper and pull timeline from it.
|
||||
pub async fn handle_request(request: Request) -> Result<Response> {
|
||||
let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
));
|
||||
if existing_tli.is_ok() {
|
||||
bail!("Timeline {} already exists", request.timeline_id);
|
||||
}
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let http_hosts = request.http_hosts.clone();
|
||||
|
||||
// Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
|
||||
let responses = futures::future::join_all(http_hosts.iter().map(|url| {
|
||||
let url = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
url, request.tenant_id, request.timeline_id
|
||||
);
|
||||
client.get(url).send()
|
||||
}))
|
||||
.await;
|
||||
|
||||
let mut statuses = Vec::new();
|
||||
for (i, response) in responses.into_iter().enumerate() {
|
||||
let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
|
||||
let status: crate::http::routes::TimelineStatus = response.json().await?;
|
||||
statuses.push((status, i));
|
||||
}
|
||||
|
||||
// Find the most advanced safekeeper
|
||||
// TODO: current logic may be wrong, fix it later
|
||||
let (status, i) = statuses
|
||||
.into_iter()
|
||||
.max_by_key(|(status, _)| {
|
||||
(
|
||||
status.acceptor_state.epoch,
|
||||
status.flush_lsn,
|
||||
status.commit_lsn,
|
||||
)
|
||||
})
|
||||
.unwrap();
|
||||
let safekeeper_host = http_hosts[i].clone();
|
||||
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
pull_timeline(status, safekeeper_host).await
|
||||
}
|
||||
|
||||
async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
info!(
|
||||
"Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
|
||||
ttid,
|
||||
host,
|
||||
status.commit_lsn,
|
||||
status.flush_lsn,
|
||||
status.acceptor_state.term,
|
||||
status.acceptor_state.epoch
|
||||
);
|
||||
|
||||
let conf = &GlobalTimelines::get_global_config();
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
// TODO: don't use debug dump, it should be used only in tests.
|
||||
// This is a proof of concept, we should figure out a way
|
||||
// to use scp without implementing it manually.
|
||||
|
||||
// Implementing our own scp over HTTP.
|
||||
// At first, we need to fetch list of files from safekeeper.
|
||||
let dump: debug_dump::Response = client
|
||||
.get(format!(
|
||||
"{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
|
||||
host, status.tenant_id, status.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
|
||||
if dump.timelines.len() != 1 {
|
||||
bail!(
|
||||
"Expected to fetch single timeline, got {} timelines",
|
||||
dump.timelines.len()
|
||||
);
|
||||
}
|
||||
|
||||
let timeline = dump.timelines.into_iter().next().unwrap();
|
||||
let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
|
||||
"Timeline {} doesn't have disk content",
|
||||
ttid
|
||||
))?;
|
||||
|
||||
let mut filenames = disk_content
|
||||
.files
|
||||
.iter()
|
||||
.map(|file| file.name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort filenames to make sure we pull files in correct order
|
||||
// After sorting, we should have:
|
||||
// - 000000010000000000000001
|
||||
// - ...
|
||||
// - 000000010000000000000002.partial
|
||||
// - safekeeper.control
|
||||
filenames.sort();
|
||||
|
||||
// safekeeper.control should be the first file, so we need to move it to the beginning
|
||||
let control_file_index = filenames
|
||||
.iter()
|
||||
.position(|name| name == "safekeeper.control")
|
||||
.ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
|
||||
filenames.remove(control_file_index);
|
||||
filenames.insert(0, "safekeeper.control".to_string());
|
||||
|
||||
info!(
|
||||
"Downloading {} files from safekeeper {}",
|
||||
filenames.len(),
|
||||
host
|
||||
);
|
||||
|
||||
// Creating temp directory for a new timeline. It needs to be
|
||||
// located on the same filesystem as the rest of the timelines.
|
||||
|
||||
// conf.workdir is usually /storage/safekeeper/data
|
||||
// will try to transform it into /storage/safekeeper/tmp
|
||||
let temp_base = conf
|
||||
.workdir
|
||||
.parent()
|
||||
.ok_or(anyhow::anyhow!("workdir has no parent"))?
|
||||
.join("tmp");
|
||||
|
||||
tokio::fs::create_dir_all(&temp_base).await?;
|
||||
|
||||
let tli_dir = tempfile::Builder::new()
|
||||
.suffix("_temptli")
|
||||
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
|
||||
.tempdir_in(temp_base)?;
|
||||
let tli_dir_path = tli_dir.path().to_owned();
|
||||
|
||||
// Note: some time happens between fetching list of files and fetching files themselves.
|
||||
// It's possible that some files will be removed from safekeeper and we will fail to fetch them.
|
||||
// This function will fail in this case, should be retried by the caller.
|
||||
for filename in filenames {
|
||||
let file_path = tli_dir_path.join(&filename);
|
||||
// /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
|
||||
let http_url = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/file/{}",
|
||||
host, status.tenant_id, status.timeline_id, filename
|
||||
);
|
||||
|
||||
let mut file = tokio::fs::File::create(&file_path).await?;
|
||||
let mut response = client.get(&http_url).send().await?;
|
||||
while let Some(chunk) = response.chunk().await? {
|
||||
file.write_all(&chunk).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: fsync?
|
||||
|
||||
// Let's create timeline from temp directory and verify that it's correct
|
||||
|
||||
let control_path = tli_dir_path.join("safekeeper.control");
|
||||
|
||||
let control_store = control_file::FileStorage::load_control_file(control_path)?;
|
||||
if control_store.server.wal_seg_size == 0 {
|
||||
bail!("wal_seg_size is not set");
|
||||
}
|
||||
|
||||
let wal_store =
|
||||
wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?;
|
||||
|
||||
let commit_lsn = status.commit_lsn;
|
||||
let flush_lsn = wal_store.flush_lsn();
|
||||
|
||||
info!(
|
||||
"Finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
|
||||
ttid, commit_lsn, flush_lsn
|
||||
);
|
||||
assert!(status.commit_lsn <= status.flush_lsn);
|
||||
|
||||
// Move timeline dir to the correct location
|
||||
let timeline_path = conf.timeline_dir(&ttid);
|
||||
|
||||
info!(
|
||||
"Moving timeline {} from {} to {}",
|
||||
ttid,
|
||||
tli_dir_path.display(),
|
||||
timeline_path.display()
|
||||
);
|
||||
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
|
||||
tokio::fs::rename(tli_dir_path, &timeline_path).await?;
|
||||
|
||||
let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?;
|
||||
|
||||
info!(
|
||||
"Loaded timeline {}, flush_lsn={}",
|
||||
ttid,
|
||||
tli.get_flush_lsn()
|
||||
);
|
||||
|
||||
Ok(Response {
|
||||
safekeeper_host: host,
|
||||
})
|
||||
}
|
||||
@@ -129,7 +129,8 @@ impl SharedState {
|
||||
// We don't want to write anything to disk, because we may have existing timeline there.
|
||||
// These functions should not change anything on disk.
|
||||
let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?;
|
||||
let wal_store =
|
||||
wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
|
||||
let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
|
||||
|
||||
Ok(Self {
|
||||
@@ -149,7 +150,8 @@ impl SharedState {
|
||||
bail!(TimelineError::UninitializedWalSegSize(*ttid));
|
||||
}
|
||||
|
||||
let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?;
|
||||
let wal_store =
|
||||
wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
|
||||
|
||||
Ok(Self {
|
||||
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
|
||||
|
||||
@@ -159,6 +159,26 @@ impl GlobalTimelines {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load timeline from disk to the memory.
|
||||
pub fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
|
||||
|
||||
match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
// TODO: prevent concurrent timeline creation/loading
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
Ok(tli)
|
||||
}
|
||||
// If we can't load a timeline, it's bad. Caller will figure it out.
|
||||
Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the number of timelines in the map.
|
||||
pub fn timelines_count() -> usize {
|
||||
TIMELINES_STATE.lock().unwrap().timelines.len()
|
||||
|
||||
@@ -112,10 +112,10 @@ impl PhysicalStorage {
|
||||
/// the disk. Otherwise, all LSNs are set to zero.
|
||||
pub fn new(
|
||||
ttid: &TenantTimelineId,
|
||||
timeline_dir: PathBuf,
|
||||
conf: &SafeKeeperConf,
|
||||
state: &SafeKeeperState,
|
||||
) -> Result<PhysicalStorage> {
|
||||
let timeline_dir = conf.timeline_dir(ttid);
|
||||
let wal_seg_size = state.server.wal_seg_size as usize;
|
||||
|
||||
// Find out where stored WAL ends, starting at commit_lsn which is a
|
||||
|
||||
@@ -1820,6 +1820,36 @@ class VanillaPostgres(PgProtocol):
|
||||
self.pg_bin.run_capture(["initdb", "-D", str(pgdatadir)])
|
||||
self.configure([f"port = {port}\n"])
|
||||
|
||||
def enable_tls(self):
|
||||
assert not self.running
|
||||
# generate self-signed certificate
|
||||
subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"req",
|
||||
"-new",
|
||||
"-x509",
|
||||
"-days",
|
||||
"365",
|
||||
"-nodes",
|
||||
"-text",
|
||||
"-out",
|
||||
self.pgdatadir / "server.crt",
|
||||
"-keyout",
|
||||
self.pgdatadir / "server.key",
|
||||
"-subj",
|
||||
"/CN=localhost",
|
||||
]
|
||||
)
|
||||
# configure postgresql.conf
|
||||
self.configure(
|
||||
[
|
||||
"ssl = on",
|
||||
"ssl_cert_file = 'server.crt'",
|
||||
"ssl_key_file = 'server.key'",
|
||||
]
|
||||
)
|
||||
|
||||
def configure(self, options: List[str]):
|
||||
"""Append lines into postgresql.conf file."""
|
||||
assert not self.running
|
||||
@@ -1992,6 +2022,7 @@ class NeonProxy(PgProtocol):
|
||||
# Link auth backend params
|
||||
*["--auth-backend", "link"],
|
||||
*["--uri", NeonProxy.link_auth_uri],
|
||||
*["--allow-self-signed-compute", "true"],
|
||||
]
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -2012,6 +2043,7 @@ class NeonProxy(PgProtocol):
|
||||
def __init__(
|
||||
self,
|
||||
neon_binpath: Path,
|
||||
test_output_dir: Path,
|
||||
proxy_port: int,
|
||||
http_port: int,
|
||||
mgmt_port: int,
|
||||
@@ -2025,6 +2057,7 @@ class NeonProxy(PgProtocol):
|
||||
self.host = host
|
||||
self.http_port = http_port
|
||||
self.neon_binpath = neon_binpath
|
||||
self.test_output_dir = test_output_dir
|
||||
self.proxy_port = proxy_port
|
||||
self.mgmt_port = mgmt_port
|
||||
self.auth_backend = auth_backend
|
||||
@@ -2051,7 +2084,8 @@ class NeonProxy(PgProtocol):
|
||||
*["--metric-collection-interval", self.metric_collection_interval],
|
||||
]
|
||||
|
||||
self._popen = subprocess.Popen(args)
|
||||
logfile = open(self.test_output_dir / "proxy.log", "w")
|
||||
self._popen = subprocess.Popen(args, stdout=logfile, stderr=logfile)
|
||||
self._wait_until_ready()
|
||||
return self
|
||||
|
||||
@@ -2108,7 +2142,7 @@ class NeonProxy(PgProtocol):
|
||||
try:
|
||||
self._popen.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
log.warn("failed to gracefully terminate proxy; killing")
|
||||
log.warning("failed to gracefully terminate proxy; killing")
|
||||
self._popen.kill()
|
||||
|
||||
@staticmethod
|
||||
@@ -2119,6 +2153,7 @@ class NeonProxy(PgProtocol):
|
||||
|
||||
if create_user:
|
||||
log.info("creating a new user for link auth test")
|
||||
local_vanilla_pg.enable_tls()
|
||||
local_vanilla_pg.start()
|
||||
local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser")
|
||||
|
||||
@@ -2152,7 +2187,9 @@ class NeonProxy(PgProtocol):
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterator[NeonProxy]:
|
||||
def link_proxy(
|
||||
port_distributor: PortDistributor, neon_binpath: Path, test_output_dir: Path
|
||||
) -> Iterator[NeonProxy]:
|
||||
"""Neon proxy that routes through link auth."""
|
||||
|
||||
http_port = port_distributor.get_port()
|
||||
@@ -2161,6 +2198,7 @@ def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterato
|
||||
|
||||
with NeonProxy(
|
||||
neon_binpath=neon_binpath,
|
||||
test_output_dir=test_output_dir,
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
@@ -2172,7 +2210,10 @@ def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterato
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def static_proxy(
|
||||
vanilla_pg: VanillaPostgres, port_distributor: PortDistributor, neon_binpath: Path
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
neon_binpath: Path,
|
||||
test_output_dir: Path,
|
||||
) -> Iterator[NeonProxy]:
|
||||
"""Neon proxy that routes directly to vanilla postgres."""
|
||||
|
||||
@@ -2191,6 +2232,7 @@ def static_proxy(
|
||||
|
||||
with NeonProxy(
|
||||
neon_binpath=neon_binpath,
|
||||
test_output_dir=test_output_dir,
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
@@ -2619,6 +2661,13 @@ class SafekeeperHttpClient(requests.Session):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def timeline_create(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn
|
||||
):
|
||||
|
||||
@@ -199,9 +199,12 @@ def proxy_metrics_handler(request: Request) -> Response:
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@pytest.fixture(scope="function")
|
||||
def proxy_with_metric_collector(
|
||||
port_distributor: PortDistributor, neon_binpath: Path, httpserver_listen_address
|
||||
port_distributor: PortDistributor,
|
||||
neon_binpath: Path,
|
||||
httpserver_listen_address,
|
||||
test_output_dir: Path,
|
||||
) -> Iterator[NeonProxy]:
|
||||
"""Neon proxy that routes through link auth and has metric collection enabled."""
|
||||
|
||||
@@ -215,6 +218,7 @@ def proxy_with_metric_collector(
|
||||
|
||||
with NeonProxy(
|
||||
neon_binpath=neon_binpath,
|
||||
test_output_dir=test_output_dir,
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
|
||||
134
test_runner/regress/test_sni_router.py
Normal file
134
test_runner/regress/test_sni_router.py
Normal file
@@ -0,0 +1,134 @@
|
||||
import socket
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import Optional, Type
|
||||
|
||||
import backoff # type: ignore
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres
|
||||
|
||||
|
||||
def generate_tls_cert(cn, certout, keyout):
|
||||
subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"req",
|
||||
"-new",
|
||||
"-x509",
|
||||
"-days",
|
||||
"365",
|
||||
"-nodes",
|
||||
"-out",
|
||||
certout,
|
||||
"-keyout",
|
||||
keyout,
|
||||
"-subj",
|
||||
f"/CN={cn}",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class PgSniRouter(PgProtocol):
|
||||
def __init__(
|
||||
self,
|
||||
neon_binpath: Path,
|
||||
port: int,
|
||||
destination: str,
|
||||
tls_cert: Path,
|
||||
tls_key: Path,
|
||||
):
|
||||
# Must use a hostname rather than IP here, for SNI to work
|
||||
host = "localhost"
|
||||
super().__init__(host=host, port=port)
|
||||
|
||||
self.host = host
|
||||
self.neon_binpath = neon_binpath
|
||||
self.port = port
|
||||
self.destination = destination
|
||||
self.tls_cert = tls_cert
|
||||
self.tls_key = tls_key
|
||||
self._popen: Optional[subprocess.Popen[bytes]] = None
|
||||
|
||||
def start(self) -> "PgSniRouter":
|
||||
assert self._popen is None
|
||||
args = [
|
||||
str(self.neon_binpath / "pg_sni_router"),
|
||||
*["--listen", f"127.0.0.1:{self.port}"],
|
||||
*["--tls-cert", str(self.tls_cert)],
|
||||
*["--tls-key", str(self.tls_key)],
|
||||
*["--destination", self.destination],
|
||||
]
|
||||
|
||||
self._popen = subprocess.Popen(args)
|
||||
self._wait_until_ready()
|
||||
return self
|
||||
|
||||
@backoff.on_exception(backoff.expo, OSError, max_time=10)
|
||||
def _wait_until_ready(self):
|
||||
socket.create_connection((self.host, self.port))
|
||||
|
||||
# Sends SIGTERM to the proxy if it has been started
|
||||
def terminate(self):
|
||||
if self._popen:
|
||||
self._popen.terminate()
|
||||
|
||||
# Waits for proxy to exit if it has been opened with a default timeout of
|
||||
# two seconds. Raises subprocess.TimeoutExpired if the proxy does not exit in time.
|
||||
def wait_for_exit(self, timeout=2):
|
||||
if self._popen:
|
||||
self._popen.wait(timeout=2)
|
||||
|
||||
def __enter__(self) -> "PgSniRouter":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
tb: Optional[TracebackType],
|
||||
):
|
||||
if self._popen is not None:
|
||||
self._popen.terminate()
|
||||
try:
|
||||
self._popen.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
log.warning("failed to gracefully terminate pg_sni_router; killing")
|
||||
self._popen.kill()
|
||||
|
||||
|
||||
def test_pg_sni_router(
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
neon_binpath: Path,
|
||||
test_output_dir: Path,
|
||||
):
|
||||
generate_tls_cert(
|
||||
"endpoint.namespace.localtest.me",
|
||||
test_output_dir / "router.crt",
|
||||
test_output_dir / "router.key",
|
||||
)
|
||||
|
||||
# Start a stand-alone Postgres to test with
|
||||
vanilla_pg.start()
|
||||
pg_port = vanilla_pg.default_options["port"]
|
||||
|
||||
router_port = port_distributor.get_port()
|
||||
|
||||
with PgSniRouter(
|
||||
neon_binpath=neon_binpath,
|
||||
port=router_port,
|
||||
destination="localtest.me",
|
||||
tls_cert=test_output_dir / "router.crt",
|
||||
tls_key=test_output_dir / "router.key",
|
||||
) as router:
|
||||
router.start()
|
||||
|
||||
out = router.safe_psql(
|
||||
"select 1",
|
||||
dbname="postgres",
|
||||
sslmode="require",
|
||||
host=f"endpoint--namespace--{pg_port}.localtest.me",
|
||||
hostaddr="127.0.0.1",
|
||||
)
|
||||
assert out[0][0] == 1
|
||||
@@ -1254,3 +1254,98 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
with closing(endpoint_other.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("INSERT INTO t (key) VALUES (123)")
|
||||
|
||||
|
||||
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str:
|
||||
return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names])
|
||||
|
||||
def execute_payload(endpoint: Endpoint):
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# we rely upon autocommit after each statement
|
||||
# as waiting for acceptors happens there
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
cur.execute("INSERT INTO t VALUES (0, 'something')")
|
||||
sum_before = query_scalar(cur, "SELECT SUM(key) FROM t")
|
||||
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
sum_after = query_scalar(cur, "SELECT SUM(key) FROM t")
|
||||
assert sum_after == sum_before + 5000050000
|
||||
|
||||
def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
|
||||
for sk in safekeepers:
|
||||
http_cli = sk.http_client()
|
||||
try:
|
||||
status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"Safekeeper {sk.id} status: {status}")
|
||||
except Exception as e:
|
||||
log.info(f"Safekeeper {sk.id} status error: {e}")
|
||||
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_pull_timeline")
|
||||
|
||||
log.info("Use only first 3 safekeepers")
|
||||
env.safekeepers[3].stop()
|
||||
active_safekeepers = [1, 2, 3]
|
||||
endpoint = env.endpoints.create("test_pull_timeline")
|
||||
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
|
||||
endpoint.start()
|
||||
|
||||
# learn neon timeline from compute
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
log.info("Kill safekeeper 2, continue with payload")
|
||||
env.safekeepers[1].stop(immediate=True)
|
||||
execute_payload(endpoint)
|
||||
|
||||
log.info("Initialize new safekeeper 4, pull data from 1 & 3")
|
||||
env.safekeepers[3].start()
|
||||
|
||||
res = (
|
||||
env.safekeepers[3]
|
||||
.http_client()
|
||||
.pull_timeline(
|
||||
{
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"http_hosts": [
|
||||
f"http://localhost:{env.safekeepers[0].port.http}",
|
||||
f"http://localhost:{env.safekeepers[2].port.http}",
|
||||
],
|
||||
}
|
||||
)
|
||||
)
|
||||
log.info("Finished pulling timeline")
|
||||
log.info(res)
|
||||
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
log.info("Restarting compute with new config to verify that it works")
|
||||
active_safekeepers = [1, 3, 4]
|
||||
|
||||
endpoint.stop_and_destroy().create("test_pull_timeline")
|
||||
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
|
||||
endpoint.start()
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
log.info("Stop sk1 (simulate failure) and use only quorum of sk3 and sk4")
|
||||
env.safekeepers[0].stop(immediate=True)
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
log.info("Restart sk4 and and use quorum of sk1 and sk4")
|
||||
env.safekeepers[3].stop()
|
||||
env.safekeepers[2].stop()
|
||||
env.safekeepers[0].start()
|
||||
env.safekeepers[3].start()
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user