diff --git a/.github/ansible/staging.eu-west-1.hosts.yaml b/.github/ansible/staging.eu-west-1.hosts.yaml index 39f5613935..a54ced7f3a 100644 --- a/.github/ansible/staging.eu-west-1.hosts.yaml +++ b/.github/ansible/staging.eu-west-1.hosts.yaml @@ -35,6 +35,8 @@ storage: hosts: pageserver-0.eu-west-1.aws.neon.build: ansible_host: i-01d496c5041c7f34c + pageserver-1.eu-west-1.aws.neon.build: + ansible_host: i-0e8013e239ce3928c safekeepers: hosts: @@ -44,3 +46,15 @@ storage: ansible_host: i-06969ee1bf2958bfc safekeeper-2.eu-west-1.aws.neon.build: ansible_host: i-087892e9625984a0b + safekeeper-3.eu-west-1.aws.neon.build: + ansible_host: i-0a6f91660e99e8891 + safekeeper-4.eu-west-1.aws.neon.build: + ansible_host: i-0012e309e28e7c249 + safekeeper-5.eu-west-1.aws.neon.build: + ansible_host: i-085a2b1193287b32e + safekeeper-6.eu-west-1.aws.neon.build: + ansible_host: i-0c713248465ed0fbd + safekeeper-7.eu-west-1.aws.neon.build: + ansible_host: i-02ad231aed2a80b7a + safekeeper-8.eu-west-1.aws.neon.build: + ansible_host: i-0dbbd8ffef66efda8 diff --git a/.github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml b/.github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml new file mode 100644 index 0000000000..a80423b12d --- /dev/null +++ b/.github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.alpha.eu-central-1.internal.aws.neon.build" + +settings: + domain: "*.snirouter.alpha.eu-central-1.internal.aws.neon.build" + sentryEnvironment: "staging" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml b/.github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml new file mode 100644 index 0000000000..c9c628af0c --- /dev/null +++ b/.github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.zeta.eu-west-1.internal.aws.neon.build" + +settings: + domain: "*.snirouter.zeta.eu-west-1.internal.aws.neon.build" + sentryEnvironment: "staging" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml b/.github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml new file mode 100644 index 0000000000..68ad096df7 --- /dev/null +++ b/.github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.beta.us-east-2.internal.aws.neon.build" + +settings: + domain: "*.snirouter.beta.us-east-2.internal.aws.neon.build" + sentryEnvironment: "staging" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml b/.github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml new file mode 100644 index 0000000000..478ad5631c --- /dev/null +++ b/.github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.epsilon.ap-southeast-1.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.epsilon.ap-southeast-1.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml b/.github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml new file mode 100644 index 0000000000..08a0a163bc --- /dev/null +++ b/.github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.gamma.eu-central-1.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.gamma.eu-central-1.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml b/.github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml new file mode 100644 index 0000000000..ab308131bc --- /dev/null +++ b/.github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.theta.us-east-1.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.theta.us-east-1.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml b/.github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml new file mode 100644 index 0000000000..ecb3f156ec --- /dev/null +++ b/.github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.delta.us-east-2.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.delta.us-east-2.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml b/.github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml new file mode 100644 index 0000000000..942250c419 --- /dev/null +++ b/.github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.eta.us-west-2.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.eta.us-west-2.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/workflows/deploy-dev.yml b/.github/workflows/deploy-dev.yml index 5d1c6e0e16..f37e1b344d 100644 --- a/.github/workflows/deploy-dev.yml +++ b/.github/workflows/deploy-dev.yml @@ -27,6 +27,11 @@ on: required: true type: boolean default: true + deployPgSniRouter: + description: 'Deploy pg-sni-router' + required: true + type: boolean + default: true env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} @@ -227,3 +232,49 @@ jobs: - name: Cleanup helm folder run: rm -rf ~/.cache + + deploy-pg-sni-router: + runs-on: [ self-hosted, gen3, small ] + container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned + if: inputs.deployPgSniRouter + defaults: + run: + shell: bash + strategy: + matrix: + include: + - target_region: us-east-2 + target_cluster: dev-us-east-2-beta + - target_region: eu-west-1 + target_cluster: dev-eu-west-1-zeta + - target_region: eu-central-1 + target_cluster: dev-eu-central-1-alpha + environment: + name: dev-${{ matrix.target_region }} + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: true + fetch-depth: 0 + ref: ${{ inputs.branch }} + + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: arn:aws:iam::369495373322:role/github-runner + aws-region: eu-central-1 + role-skip-session-tagging: true + role-duration-seconds: 1800 + + - name: Configure environment + run: | + helm repo add neondatabase https://neondatabase.github.io/helm-charts + aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }} + + - name: Deploy pg-sni-router + run: + helm upgrade neon-pg-sni-router neondatabase/neon-pg-sni-router --namespace neon-pg-sni-router --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.pg-sni-router.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 15m0s + + - name: Cleanup helm folder + run: rm -rf ~/.cache diff --git a/.github/workflows/deploy-prod.yml b/.github/workflows/deploy-prod.yml index 9fa31b3225..c5d690db3a 100644 --- a/.github/workflows/deploy-prod.yml +++ b/.github/workflows/deploy-prod.yml @@ -27,6 +27,11 @@ on: required: true type: boolean default: true + deployPgSniRouter: + description: 'Deploy pg-sni-router' + required: true + type: boolean + default: true disclamerAcknowledged: description: 'I confirm that there is an emergency and I can not use regular release workflow' required: true @@ -171,3 +176,42 @@ jobs: - name: Deploy storage-broker run: helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s + + deploy-pg-sni-router: + runs-on: prod + container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest + if: inputs.deployPgSniRouter && inputs.disclamerAcknowledged + defaults: + run: + shell: bash + strategy: + matrix: + include: + - target_region: us-east-2 + target_cluster: prod-us-east-2-delta + - target_region: us-west-2 + target_cluster: prod-us-west-2-eta + - target_region: eu-central-1 + target_cluster: prod-eu-central-1-gamma + - target_region: ap-southeast-1 + target_cluster: prod-ap-southeast-1-epsilon + - target_region: us-east-1 + target_cluster: prod-us-east-1-theta + environment: + name: prod-${{ matrix.target_region }} + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: true + fetch-depth: 0 + ref: ${{ inputs.branch }} + + - name: Configure environment + run: | + helm repo add neondatabase https://neondatabase.github.io/helm-charts + aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }} + + - name: Deploy pg-sni-router + run: + helm upgrade neon-pg-sni-router neondatabase/neon-pg-sni-router --namespace neon-pg-sni-router --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.pg-sni-router.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 15m0s diff --git a/Cargo.lock b/Cargo.lock index 5f3a83ce2d..bce2d11188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1574,6 +1574,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.1.0" @@ -2361,6 +2376,24 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nix" version = "0.26.2" @@ -2483,12 +2516,50 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "openssl" +version = "0.10.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.15", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.18.0" @@ -2816,6 +2887,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" + [[package]] name = "plotters" version = "0.3.4" @@ -2847,7 +2924,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "bytes", "fallible-iterator", @@ -2857,10 +2934,21 @@ dependencies = [ "tokio-postgres", ] +[[package]] +name = "postgres-native-tls" +version = "0.5.0" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" +dependencies = [ + "native-tls", + "tokio", + "tokio-native-tls", + "tokio-postgres", +] + [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "base64 0.20.0", "byteorder", @@ -2878,7 +2966,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "bytes", "fallible-iterator", @@ -3109,10 +3197,12 @@ dependencies = [ "itertools", "md5", "metrics", + "native-tls", "once_cell", "opentelemetry", "parking_lot", "pin-project-lite", + "postgres-native-tls", "postgres_backend", "pq_proto", "prometheus", @@ -3567,6 +3657,7 @@ dependencies = [ "const_format", "crc32c", "fs2", + "futures", "git-version", "hex", "humantime", @@ -3581,6 +3672,7 @@ dependencies = [ "pq_proto", "regex", "remote_storage", + "reqwest", "safekeeper_api", "serde", "serde_json", @@ -3868,8 +3960,7 @@ dependencies = [ [[package]] name = "sharded-slab" version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +source = "git+https://github.com/neondatabase/sharded-slab.git?rev=98d16753ab01c61f0a028de44167307a00efea00#98d16753ab01c61f0a028de44167307a00efea00" dependencies = [ "lazy_static", ] @@ -4319,10 +4410,20 @@ dependencies = [ "syn 2.0.15", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "async-trait", "byteorder", @@ -4914,6 +5015,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index f4872433cd..b73e29ef6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ jsonwebtoken = "8" libc = "0.2" md5 = "0.7.0" memoffset = "0.8" +native-tls = "0.2" nix = "0.26" notify = "5.0.0" num_cpus = "1.15" @@ -124,10 +125,11 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } ## Other git libraries @@ -159,10 +161,16 @@ rstest = "0.17" tempfile = "3.4" tonic-build = "0.9" +[patch.crates-io] + # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. -[patch.crates-io] -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } + +# Changes the MAX_THREADS limit from 4096 to 32768. +# This is a temporary workaround for using tracing from many threads in safekeepers code, +# until async safekeepers patch is merged to the main. +sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" } ################# Binary contents sections diff --git a/Dockerfile b/Dockerfile index 6f7d2c32a5..f83f3b1c21 100644 --- a/Dockerfile +++ b/Dockerfile @@ -44,7 +44,15 @@ COPY --chown=nonroot . . # Show build caching stats to check if it was used in the end. # Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats. RUN set -e \ -&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin storage_broker --bin proxy --locked --release \ + && mold -run cargo build \ + --bin pg_sni_router \ + --bin pageserver \ + --bin pageserver_binutils \ + --bin draw_timeline_dir \ + --bin safekeeper \ + --bin storage_broker \ + --bin proxy \ + --locked --release \ && cachepot -s # Build final image @@ -63,6 +71,7 @@ RUN set -e \ && useradd -d /data neon \ && chown -R neon:neon /data +COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index deb20f21f8..b5d7eb0132 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -287,14 +287,33 @@ impl EvictionsWithLowResidenceDuration { let Some(_counter) = self.counter.take() else { return; }; - EVICTIONS_WITH_LOW_RESIDENCE_DURATION - .remove_label_values(&[ - tenant_id, - timeline_id, - self.data_source, - &Self::threshold_label_value(self.threshold), - ]) - .expect("we own the metric, no-one else should remove it"); + + let threshold = Self::threshold_label_value(self.threshold); + + let removed = EVICTIONS_WITH_LOW_RESIDENCE_DURATION.remove_label_values(&[ + tenant_id, + timeline_id, + self.data_source, + &threshold, + ]); + + match removed { + Err(e) => { + // this has been hit in staging as + // , but we don't know how. + // because we can be in the drop path already, don't risk: + // - "double-panic => illegal instruction" or + // - future "drop panick => abort" + // + // so just nag: (the error has the labels) + tracing::warn!("failed to remove EvictionsWithLowResidenceDuration, it was already removed? {e:#?}"); + } + Ok(()) => { + // to help identify cases where we double-remove the same values, let's log all + // deletions? + tracing::info!("removed EvictionsWithLowResidenceDuration with {tenant_id}, {timeline_id}, {}, {threshold}", self.data_source); + } + } } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8b0795db3c..a7a0d1a22e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -352,7 +352,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, ctx: RequestContext, - ) -> anyhow::Result<()> + ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { @@ -398,7 +398,9 @@ impl PageServerHandler { Some(FeMessage::CopyData(bytes)) => bytes, Some(FeMessage::Terminate) => break, Some(m) => { - anyhow::bail!("unexpected message: {m:?} during COPY"); + return Err(QueryError::Other(anyhow::anyhow!( + "unexpected message: {m:?} during COPY" + ))); } None => break, // client disconnected }; diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 9d702b29c3..e7a4fd236e 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -62,6 +62,8 @@ utils.workspace = true uuid.workspace = true webpki-roots.workspace = true x509-parser.workspace = true +native-tls.workspace = true +postgres-native-tls.workspace = true workspace_hack.workspace = true tokio-util.workspace = true diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index 7175a23dc1..da43cf11c4 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -9,6 +9,7 @@ use crate::{ use pq_proto::BeMessage as Be; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_postgres::config::SslMode; use tracing::{info, info_span}; #[derive(Debug, Error)] @@ -87,6 +88,16 @@ pub(super) async fn authenticate( .dbname(&db_info.dbname) .user(&db_info.user); + // Backwards compatibility. pg_sni_proxy uses "--" in domain names + // while direct connections do not. Once we migrate to pg_sni_proxy + // everywhere, we can remove this. + if db_info.host.contains("--") { + // we need TLS connection with SNI info to properly route it + config.ssl_mode(SslMode::Require); + } else { + config.ssl_mode(SslMode::Disable); + } + if let Some(password) = db_info.password { config.password(password.as_ref()); } @@ -96,6 +107,7 @@ pub(super) async fn authenticate( value: NodeInfo { config, aux: db_info.aux.into(), + allow_self_signed_compute: false, // caller may override }, }) } diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs new file mode 100644 index 0000000000..bba2d51caf --- /dev/null +++ b/proxy/src/bin/pg_sni_router.rs @@ -0,0 +1,250 @@ +/// A stand-alone program that routes connections, e.g. from +/// `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`. +/// +/// This allows connecting to pods/services running in the same Kubernetes cluster from +/// the outside. Similar to an ingress controller for HTTPS. +use std::{net::SocketAddr, sync::Arc}; + +use tokio::net::TcpListener; + +use anyhow::{anyhow, bail, ensure, Context}; +use clap::{self, Arg}; +use futures::TryFutureExt; +use proxy::console::messages::MetricsAuxInfo; +use proxy::stream::{PqStream, Stream}; + +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::sync::CancellationToken; +use utils::{project_git_version, sentry_init::init_sentry}; + +use tracing::{error, info, warn}; + +project_git_version!(GIT_VERSION); + +fn cli() -> clap::Command { + clap::Command::new("Neon proxy/router") + .version(GIT_VERSION) + .arg( + Arg::new("listen") + .short('l') + .long("listen") + .help("listen for incoming client connections on ip:port") + .default_value("127.0.0.1:4432"), + ) + .arg( + Arg::new("tls-key") + .short('k') + .long("tls-key") + .help("path to TLS key for client postgres connections") + .required(true), + ) + .arg( + Arg::new("tls-cert") + .short('c') + .long("tls-cert") + .help("path to TLS cert for client postgres connections") + .required(true), + ) + .arg( + Arg::new("dest") + .short('d') + .long("destination") + .help("append this domain zone to the SNI hostname to get the destination address") + .required(true), + ) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let _logging_guard = proxy::logging::init().await?; + let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); + let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); + + let args = cli().get_matches(); + let destination: String = args.get_one::("dest").unwrap().parse()?; + + // Configure TLS + let tls_config: Arc = match ( + args.get_one::("tls-key"), + args.get_one::("tls-cert"), + ) { + (Some(key_path), Some(cert_path)) => { + let key = { + let key_bytes = std::fs::read(key_path).context("TLS key file")?; + let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]) + .context(format!("Failed to read TLS keys at '{key_path}'"))?; + + ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len()); + keys.pop().map(rustls::PrivateKey).unwrap() + }; + + let cert_chain_bytes = std::fs::read(cert_path) + .context(format!("Failed to read TLS cert file at '{cert_path}.'"))?; + + let cert_chain = { + rustls_pemfile::certs(&mut &cert_chain_bytes[..]) + .context(format!( + "Failed to read TLS certificate chain from bytes from file at '{cert_path}'." + ))? + .into_iter() + .map(rustls::Certificate) + .collect() + }; + + rustls::ServerConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])? + .with_no_client_auth() + .with_single_cert(cert_chain, key)? + .into() + } + _ => bail!("tls-key and tls-cert must be specified"), + }; + + // Start listening for incoming client connections + let proxy_address: SocketAddr = args.get_one::("listen").unwrap().parse()?; + info!("Starting sni router on {proxy_address}"); + let proxy_listener = TcpListener::bind(proxy_address).await?; + + let cancellation_token = CancellationToken::new(); + + let main = proxy::flatten_err(tokio::spawn(task_main( + Arc::new(destination), + tls_config, + proxy_listener, + cancellation_token.clone(), + ))); + let signals_task = proxy::flatten_err(tokio::spawn(proxy::handle_signals(cancellation_token))); + + tokio::select! { + res = main => { res?; }, + res = signals_task => { res?; }, + } + + Ok(()) +} + +async fn task_main( + dest_suffix: Arc, + tls_config: Arc, + listener: tokio::net::TcpListener, + cancellation_token: CancellationToken, +) -> anyhow::Result<()> { + // When set for the server socket, the keepalive setting + // will be inherited by all accepted client sockets. + socket2::SockRef::from(&listener).set_keepalive(true)?; + + let mut connections = tokio::task::JoinSet::new(); + + loop { + tokio::select! { + accept_result = listener.accept() => { + let (socket, peer_addr) = accept_result?; + info!("accepted postgres client connection from {peer_addr}"); + + let session_id = uuid::Uuid::new_v4(); + let tls_config = Arc::clone(&tls_config); + let dest_suffix = Arc::clone(&dest_suffix); + + connections.spawn( + async move { + info!("spawned a task for {peer_addr}"); + + socket + .set_nodelay(true) + .context("failed to set socket option")?; + + handle_client(dest_suffix, tls_config, session_id, socket).await + } + .unwrap_or_else(|e| { + // Acknowledge that the task has finished with an error. + error!("per-client task finished with an error: {e:#}"); + }), + ); + } + _ = cancellation_token.cancelled() => { + drop(listener); + break; + } + } + } + + // Drain connections + info!("waiting for all client connections to finish"); + while let Some(res) = connections.join_next().await { + if let Err(e) = res { + if !e.is_panic() && !e.is_cancelled() { + warn!("unexpected error from joined connection task: {e:?}"); + } + } + } + info!("all client connections have finished"); + Ok(()) +} + +const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; + +async fn ssl_handshake( + raw_stream: S, + tls_config: Arc, +) -> anyhow::Result> { + let mut stream = PqStream::new(Stream::from_raw(raw_stream)); + + let msg = stream.read_startup_packet().await?; + info!("received {msg:?}"); + use pq_proto::FeStartupPacket::*; + + match msg { + SslRequest => { + stream + .write_message(&pq_proto::BeMessage::EncryptionResponse(true)) + .await?; + // Upgrade raw stream into a secure TLS-backed stream. + // NOTE: We've consumed `tls`; this fact will be used later. + + let (raw, read_buf) = stream.into_inner(); + // TODO: Normally, client doesn't send any data before + // server says TLS handshake is ok and read_buf is empy. + // However, you could imagine pipelining of postgres + // SSLRequest + TLS ClientHello in one hunk similar to + // pipelining in our node js driver. We should probably + // support that by chaining read_buf with the stream. + if !read_buf.is_empty() { + bail!("data is sent before server replied with EncryptionResponse"); + } + Ok(raw.upgrade(tls_config).await?) + } + _ => stream.throw_error_str(ERR_INSECURE_CONNECTION).await?, + } +} + +#[tracing::instrument(fields(session_id = ?session_id), skip_all)] +async fn handle_client( + dest_suffix: Arc, + tls_config: Arc, + session_id: uuid::Uuid, + stream: impl AsyncRead + AsyncWrite + Unpin, +) -> anyhow::Result<()> { + let tls_stream = ssl_handshake(stream, tls_config).await?; + + // Cut off first part of the SNI domain + // We receive required destination details in the format of + // `{k8s_service_name}--{k8s_namespace}--{port}.non-sni-domain` + let sni = tls_stream.sni_hostname().ok_or(anyhow!("SNI missing"))?; + let dest: Vec<&str> = sni + .split_once('.') + .context("invalid SNI")? + .0 + .splitn(3, "--") + .collect(); + let port = dest[2].parse::().context("invalid port")?; + let destination = format!("{}.{}.{}:{}", dest[0], dest[1], dest_suffix, port); + + info!("destination: {}", destination); + + let client = tokio::net::TcpStream::connect(destination).await?; + + let metrics_aux: MetricsAuxInfo = Default::default(); + proxy::proxy::proxy_pass(tls_stream, client, &metrics_aux).await +} diff --git a/proxy/src/main.rs b/proxy/src/bin/proxy.rs similarity index 79% rename from proxy/src/main.rs rename to proxy/src/bin/proxy.rs index 1fd13c9f68..28e6e25317 100644 --- a/proxy/src/main.rs +++ b/proxy/src/bin/proxy.rs @@ -1,49 +1,23 @@ -//! Postgres protocol proxy/router. -//! -//! This service listens psql port and can check auth via external service -//! (control plane API in our case) and can create new databases and accounts -//! in somewhat transparent manner (again via communication with control plane API). +use proxy::auth; +use proxy::console; +use proxy::http; +use proxy::metrics; -mod auth; -mod cache; -mod cancellation; -mod compute; -mod config; -mod console; -mod error; -mod http; -mod logging; -mod metrics; -mod parse; -mod proxy; -mod sasl; -mod scram; -mod stream; -mod url; -mod waiters; - -use anyhow::{bail, Context}; +use anyhow::bail; use clap::{self, Arg}; -use config::ProxyConfig; -use futures::FutureExt; -use std::{borrow::Cow, future::Future, net::SocketAddr}; -use tokio::{net::TcpListener, task::JoinError}; +use proxy::config::{self, ProxyConfig}; +use std::{borrow::Cow, net::SocketAddr}; +use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; -use tracing::{info, warn}; +use tracing::info; +use tracing::warn; use utils::{project_git_version, sentry_init::init_sentry}; project_git_version!(GIT_VERSION); -/// Flattens `Result>` into `Result`. -async fn flatten_err( - f: impl Future, JoinError>>, -) -> anyhow::Result<()> { - f.map(|r| r.context("join error").and_then(|x| x)).await -} - #[tokio::main] async fn main() -> anyhow::Result<()> { - let _logging_guard = logging::init().await?; + let _logging_guard = proxy::logging::init().await?; let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); @@ -69,7 +43,7 @@ async fn main() -> anyhow::Result<()> { let proxy_listener = TcpListener::bind(proxy_address).await?; let cancellation_token = CancellationToken::new(); - let mut client_tasks = vec![tokio::spawn(proxy::task_main( + let mut client_tasks = vec![tokio::spawn(proxy::proxy::task_main( config, proxy_listener, cancellation_token.clone(), @@ -88,7 +62,7 @@ async fn main() -> anyhow::Result<()> { } let mut tasks = vec![ - tokio::spawn(handle_signals(cancellation_token)), + tokio::spawn(proxy::handle_signals(cancellation_token)), tokio::spawn(http::server::task_main(http_listener)), tokio::spawn(console::mgmt::task_main(mgmt_listener)), ]; @@ -97,8 +71,9 @@ async fn main() -> anyhow::Result<()> { tasks.push(tokio::spawn(metrics::task_main(metrics_config))); } - let tasks = futures::future::try_join_all(tasks.into_iter().map(flatten_err)); - let client_tasks = futures::future::try_join_all(client_tasks.into_iter().map(flatten_err)); + let tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err)); + let client_tasks = + futures::future::try_join_all(client_tasks.into_iter().map(proxy::flatten_err)); tokio::select! { // We are only expecting an error from these forever tasks res = tasks => { res?; }, @@ -107,33 +82,6 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -/// Handle unix signals appropriately. -async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> { - use tokio::signal::unix::{signal, SignalKind}; - - let mut hangup = signal(SignalKind::hangup())?; - let mut interrupt = signal(SignalKind::interrupt())?; - let mut terminate = signal(SignalKind::terminate())?; - - loop { - tokio::select! { - // Hangup is commonly used for config reload. - _ = hangup.recv() => { - warn!("received SIGHUP; config reload is not supported"); - } - // Shut down the whole application. - _ = interrupt.recv() => { - warn!("received SIGINT, exiting immediately"); - bail!("interrupted"); - } - _ = terminate.recv() => { - warn!("received SIGTERM, shutting down once all existing connections have closed"); - token.cancel(); - } - } - } -} - /// ProxyConfig is created at proxy startup, and lives forever. fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> { let tls_config = match ( @@ -149,6 +97,14 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> _ => bail!("either both or neither tls-key and tls-cert must be specified"), }; + let allow_self_signed_compute: bool = args + .get_one::("allow-self-signed-compute") + .unwrap() + .parse()?; + if allow_self_signed_compute { + warn!("allowing self-signed compute certificates"); + } + let metric_collection = match ( args.get_one::("metric-collection-endpoint"), args.get_one::("metric-collection-interval"), @@ -198,6 +154,7 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> tls_config, auth_backend, metric_collection, + allow_self_signed_compute, })); Ok(config) @@ -288,6 +245,12 @@ fn cli() -> clap::Command { .help("cache for `wake_compute` api method (use `size=0` to disable)") .default_value(config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO), ) + .arg( + Arg::new("allow-self-signed-compute") + .long("allow-self-signed-compute") + .help("Allow self-signed certificates for compute nodes (for testing)") + .default_value("false"), + ) } #[cfg(test)] diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 0465703ae6..d277940a12 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -5,7 +5,7 @@ use pq_proto::StartupMessageParams; use std::{io, net::SocketAddr, time::Duration}; use thiserror::Error; use tokio::net::TcpStream; -use tokio_postgres::NoTls; +use tokio_postgres::tls::MakeTlsConnect; use tracing::{error, info, warn}; const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node"; @@ -19,6 +19,9 @@ pub enum ConnectionError { #[error("{COULD_NOT_CONNECT}: {0}")] CouldNotConnect(#[from] io::Error), + + #[error("{COULD_NOT_CONNECT}: {0}")] + TlsError(#[from] native_tls::Error), } impl UserFacingError for ConnectionError { @@ -125,9 +128,15 @@ impl std::ops::DerefMut for ConnCfg { } } +impl Default for ConnCfg { + fn default() -> Self { + Self::new() + } +} + impl ConnCfg { /// Establish a raw TCP connection to the compute node. - async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> { + async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream, &str)> { use tokio_postgres::config::Host; // wrap TcpStream::connect with timeout @@ -180,7 +189,7 @@ impl ConnCfg { }; match connect_once(host, *port).await { - Ok(socket) => return Ok(socket), + Ok((sockaddr, stream)) => return Ok((sockaddr, stream, host)), Err(err) => { // We can't throw an error here, as there might be more hosts to try. warn!("couldn't connect to compute node at {host}:{port}: {err}"); @@ -200,7 +209,10 @@ impl ConnCfg { pub struct PostgresConnection { /// Socket connected to a compute node. - pub stream: TcpStream, + pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream< + tokio::net::TcpStream, + postgres_native_tls::TlsStream, + >, /// PostgreSQL connection parameters. pub params: std::collections::HashMap, /// Query cancellation token. @@ -208,11 +220,27 @@ pub struct PostgresConnection { } impl ConnCfg { - async fn do_connect(&self) -> Result { - // TODO: establish a secure connection to the DB. - let (socket_addr, mut stream) = self.connect_raw().await?; - let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?; - info!("connected to compute node at {socket_addr}"); + async fn do_connect( + &self, + allow_self_signed_compute: bool, + ) -> Result { + let (socket_addr, stream, host) = self.connect_raw().await?; + + let tls_connector = native_tls::TlsConnector::builder() + .danger_accept_invalid_certs(allow_self_signed_compute) + .build() + .unwrap(); + let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector); + let tls = MakeTlsConnect::::make_tls_connect(&mut mk_tls, host)?; + + // connect_raw() will not use TLS if sslmode is "disable" + let (client, connection) = self.0.connect_raw(stream, tls).await?; + let stream = connection.stream.into_inner(); + + info!( + "connected to compute node at {host} ({socket_addr}) sslmode={:?}", + self.0.get_ssl_mode() + ); // This is very ugly but as of now there's no better way to // extract the connection parameters from tokio-postgres' connection. @@ -233,8 +261,11 @@ impl ConnCfg { } /// Connect to a corresponding compute node. - pub async fn connect(&self) -> Result { - self.do_connect() + pub async fn connect( + &self, + allow_self_signed_compute: bool, + ) -> Result { + self.do_connect(allow_self_signed_compute) .inspect_err(|err| { // Immediately log the error we have at our disposal. error!("couldn't connect to compute node: {err}"); diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 0ceb556ca1..530229b3fd 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -12,6 +12,7 @@ pub struct ProxyConfig { pub tls_config: Option, pub auth_backend: auth::BackendType<'static, ()>, pub metric_collection: Option, + pub allow_self_signed_compute: bool, } #[derive(Debug)] diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 80cd94d483..44e23e0adf 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -170,6 +170,9 @@ pub struct NodeInfo { /// Labels for proxy's metrics. pub aux: Arc, + + /// Whether we should accept self-signed certificates (for testing) + pub allow_self_signed_compute: bool, } pub type NodeInfoCache = TimedLru, NodeInfo>; diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index eaac9c06d9..3b42c73a34 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -8,6 +8,7 @@ use crate::{auth::ClientCredentials, compute, error::io_error, scram, url::ApiUr use async_trait::async_trait; use futures::TryFutureExt; use thiserror::Error; +use tokio_postgres::config::SslMode; use tracing::{error, info, info_span, warn, Instrument}; #[derive(Debug, Error)] @@ -86,11 +87,13 @@ impl Api { let mut config = compute::ConnCfg::new(); config .host(self.endpoint.host_str().unwrap_or("localhost")) - .port(self.endpoint.port().unwrap_or(5432)); + .port(self.endpoint.port().unwrap_or(5432)) + .ssl_mode(SslMode::Disable); let node = NodeInfo { config, aux: Default::default(), + allow_self_signed_compute: false, }; Ok(node) diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 3644db17f7..a8e855b2c8 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -8,6 +8,7 @@ use super::{ use crate::{auth::ClientCredentials, compute, http, scram}; use async_trait::async_trait; use futures::TryFutureExt; +use tokio_postgres::config::SslMode; use tracing::{error, info, info_span, warn, Instrument}; #[derive(Clone)] @@ -100,11 +101,12 @@ impl Api { // We'll set username and such later using the startup message. // TODO: add more type safety (in progress). let mut config = compute::ConnCfg::new(); - config.host(host).port(port); + config.host(host).port(port).ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes. let node = NodeInfo { config, aux: body.aux.into(), + allow_self_signed_compute: false, }; Ok(node) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs new file mode 100644 index 0000000000..148ee67d90 --- /dev/null +++ b/proxy/src/lib.rs @@ -0,0 +1,57 @@ +use anyhow::{bail, Context}; +use futures::{Future, FutureExt}; +use tokio::task::JoinError; +use tokio_util::sync::CancellationToken; +use tracing::warn; + +pub mod auth; +pub mod cache; +pub mod cancellation; +pub mod compute; +pub mod config; +pub mod console; +pub mod error; +pub mod http; +pub mod logging; +pub mod metrics; +pub mod parse; +pub mod proxy; +pub mod sasl; +pub mod scram; +pub mod stream; +pub mod url; +pub mod waiters; + +/// Handle unix signals appropriately. +pub async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> { + use tokio::signal::unix::{signal, SignalKind}; + + let mut hangup = signal(SignalKind::hangup())?; + let mut interrupt = signal(SignalKind::interrupt())?; + let mut terminate = signal(SignalKind::terminate())?; + + loop { + tokio::select! { + // Hangup is commonly used for config reload. + _ = hangup.recv() => { + warn!("received SIGHUP; config reload is not supported"); + } + // Shut down the whole application. + _ = interrupt.recv() => { + warn!("received SIGINT, exiting immediately"); + bail!("interrupted"); + } + _ = terminate.recv() => { + warn!("received SIGTERM, shutting down once all existing connections have closed"); + token.cancel(); + } + } + } +} + +/// Flattens `Result>` into `Result`. +pub async fn flatten_err( + f: impl Future, JoinError>>, +) -> anyhow::Result<()> { + f.map(|r| r.context("join error").and_then(|x| x)).await +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 1169d76160..f3d3524d30 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -155,7 +155,7 @@ pub async fn handle_ws_client( async { result }.or_else(|e| stream.throw_error(e)).await? }; - let client = Client::new(stream, creds, ¶ms, session_id); + let client = Client::new(stream, creds, ¶ms, session_id, false); cancel_map .with_session(|session| client.connect_to_db(session, true)) .await @@ -194,7 +194,15 @@ async fn handle_client( async { result }.or_else(|e| stream.throw_error(e)).await? }; - let client = Client::new(stream, creds, ¶ms, session_id); + let allow_self_signed_compute = config.allow_self_signed_compute; + + let client = Client::new( + stream, + creds, + ¶ms, + session_id, + allow_self_signed_compute, + ); cancel_map .with_session(|session| client.connect_to_db(session, false)) .await @@ -297,9 +305,11 @@ async fn connect_to_compute_once( NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); }; + let allow_self_signed_compute = node_info.allow_self_signed_compute; + node_info .config - .connect() + .connect(allow_self_signed_compute) .inspect_err(invalidate_cache) .await } @@ -378,7 +388,7 @@ async fn prepare_client_connection( /// Forward bytes in both directions (client <-> compute). #[tracing::instrument(skip_all)] -async fn proxy_pass( +pub async fn proxy_pass( client: impl AsyncRead + AsyncWrite + Unpin, compute: impl AsyncRead + AsyncWrite + Unpin, aux: &MetricsAuxInfo, @@ -420,6 +430,8 @@ struct Client<'a, S> { params: &'a StartupMessageParams, /// Unique connection ID. session_id: uuid::Uuid, + /// Allow self-signed certificates (for testing). + allow_self_signed_compute: bool, } impl<'a, S> Client<'a, S> { @@ -429,12 +441,14 @@ impl<'a, S> Client<'a, S> { creds: auth::BackendType<'a, auth::ClientCredentials<'a>>, params: &'a StartupMessageParams, session_id: uuid::Uuid, + allow_self_signed_compute: bool, ) -> Self { Self { stream, creds, params, session_id, + allow_self_signed_compute, } } } @@ -451,6 +465,7 @@ impl Client<'_, S> { mut creds, params, session_id, + allow_self_signed_compute, } = self; let extra = console::ConsoleReqExtra { @@ -473,6 +488,8 @@ impl Client<'_, S> { value: mut node_info, } = auth_result; + node_info.allow_self_signed_compute = allow_self_signed_compute; + let mut node = connect_to_compute(&mut node_info, params, &extra, &creds) .or_else(|e| stream.throw_error(e)) .await?; diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 00cd111da5..b6e8497809 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -19,11 +19,13 @@ git-version.workspace = true hex.workspace = true humantime.workspace = true hyper.workspace = true +futures.workspace = true once_cell.workspace = true parking_lot.workspace = true postgres.workspace = true postgres-protocol.workspace = true regex.workspace = true +reqwest = { workspace = true, features = ["json"] } serde.workspace = true serde_json.workspace = true serde_with.workspace = true @@ -33,6 +35,7 @@ tokio = { workspace = true, features = ["fs"] } tokio-io-timeout.workspace = true tokio-postgres.workspace = true toml_edit.workspace = true +tempfile.workspace = true tracing.workspace = true url.workspace = true metrics.workspace = true @@ -45,6 +48,3 @@ storage_broker.workspace = true utils.workspace = true workspace_hack.workspace = true - -[dev-dependencies] -tempfile.workspace = true diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index eeb08d2733..a498d868af 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -11,11 +11,13 @@ use std::str::FromStr; use std::sync::Arc; use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; +use tokio::fs::File; +use tokio::io::AsyncReadExt; use tokio::task::JoinError; -use crate::debug_dump; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; +use crate::{debug_dump, pull_timeline}; use crate::timelines_global_map::TimelineDeleteForceResult; use crate::GlobalTimelines; @@ -177,6 +179,49 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, ApiError> { + check_permission(&request, None)?; + + let data: pull_timeline::Request = json_request(&mut request).await?; + + let resp = pull_timeline::handle_request(data) + .await + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, resp) +} + +/// Download a file from the timeline directory. +// TODO: figure out a better way to copy files between safekeepers +async fn timeline_files_handler(request: Request) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + check_permission(&request, Some(ttid.tenant_id))?; + + let filename: String = parse_request_param(&request, "filename")?; + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + + let filepath = tli.timeline_dir.join(filename); + let mut file = File::open(&filepath) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + + let mut content = Vec::new(); + // TODO: don't store files in memory + file.read_to_end(&mut content) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/octet-stream") + .body(Body::from(content)) + .map_err(|e| ApiError::InternalServerError(e.into())) +} + /// Deactivates the timeline and removes its data directory. async fn timeline_delete_force_handler( mut request: Request, @@ -353,6 +398,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder timeline_delete_force_handler, ) .delete("/v1/tenant/:tenant_id", tenant_delete_force_handler) + .post("/v1/pull_timeline", timeline_pull_handler) + .get( + "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename", + timeline_files_handler, + ) // for tests .post( "/v1/record_safekeeper_info/:tenant_id/:timeline_id", diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 2c28c5218d..ff621fdbc0 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -15,6 +15,7 @@ pub mod handler; pub mod http; pub mod json_ctrl; pub mod metrics; +pub mod pull_timeline; pub mod receive_wal; pub mod remove_wal; pub mod safekeeper; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs new file mode 100644 index 0000000000..344b760fd3 --- /dev/null +++ b/safekeeper/src/pull_timeline.rs @@ -0,0 +1,240 @@ +use serde::{Deserialize, Serialize}; + +use anyhow::{bail, Context, Result}; +use tokio::io::AsyncWriteExt; +use tracing::info; +use utils::id::{TenantId, TenantTimelineId, TimelineId}; + +use serde_with::{serde_as, DisplayFromStr}; + +use crate::{ + control_file, debug_dump, + http::routes::TimelineStatus, + wal_storage::{self, Storage}, + GlobalTimelines, +}; + +/// Info about timeline on safekeeper ready for reporting. +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub struct Request { + #[serde_as(as = "DisplayFromStr")] + pub tenant_id: TenantId, + #[serde_as(as = "DisplayFromStr")] + pub timeline_id: TimelineId, + pub http_hosts: Vec, +} + +#[derive(Debug, Serialize)] +pub struct Response { + // Donor safekeeper host + pub safekeeper_host: String, + // TODO: add more fields? +} + +/// Find the most advanced safekeeper and pull timeline from it. +pub async fn handle_request(request: Request) -> Result { + let existing_tli = GlobalTimelines::get(TenantTimelineId::new( + request.tenant_id, + request.timeline_id, + )); + if existing_tli.is_ok() { + bail!("Timeline {} already exists", request.timeline_id); + } + + let client = reqwest::Client::new(); + let http_hosts = request.http_hosts.clone(); + + // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id + let responses = futures::future::join_all(http_hosts.iter().map(|url| { + let url = format!( + "{}/v1/tenant/{}/timeline/{}", + url, request.tenant_id, request.timeline_id + ); + client.get(url).send() + })) + .await; + + let mut statuses = Vec::new(); + for (i, response) in responses.into_iter().enumerate() { + let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?; + let status: crate::http::routes::TimelineStatus = response.json().await?; + statuses.push((status, i)); + } + + // Find the most advanced safekeeper + // TODO: current logic may be wrong, fix it later + let (status, i) = statuses + .into_iter() + .max_by_key(|(status, _)| { + ( + status.acceptor_state.epoch, + status.flush_lsn, + status.commit_lsn, + ) + }) + .unwrap(); + let safekeeper_host = http_hosts[i].clone(); + + assert!(status.tenant_id == request.tenant_id); + assert!(status.timeline_id == request.timeline_id); + + pull_timeline(status, safekeeper_host).await +} + +async fn pull_timeline(status: TimelineStatus, host: String) -> Result { + let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id); + info!( + "Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}", + ttid, + host, + status.commit_lsn, + status.flush_lsn, + status.acceptor_state.term, + status.acceptor_state.epoch + ); + + let conf = &GlobalTimelines::get_global_config(); + + let client = reqwest::Client::new(); + // TODO: don't use debug dump, it should be used only in tests. + // This is a proof of concept, we should figure out a way + // to use scp without implementing it manually. + + // Implementing our own scp over HTTP. + // At first, we need to fetch list of files from safekeeper. + let dump: debug_dump::Response = client + .get(format!( + "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}", + host, status.tenant_id, status.timeline_id + )) + .send() + .await? + .json() + .await?; + + if dump.timelines.len() != 1 { + bail!( + "Expected to fetch single timeline, got {} timelines", + dump.timelines.len() + ); + } + + let timeline = dump.timelines.into_iter().next().unwrap(); + let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!( + "Timeline {} doesn't have disk content", + ttid + ))?; + + let mut filenames = disk_content + .files + .iter() + .map(|file| file.name.clone()) + .collect::>(); + + // Sort filenames to make sure we pull files in correct order + // After sorting, we should have: + // - 000000010000000000000001 + // - ... + // - 000000010000000000000002.partial + // - safekeeper.control + filenames.sort(); + + // safekeeper.control should be the first file, so we need to move it to the beginning + let control_file_index = filenames + .iter() + .position(|name| name == "safekeeper.control") + .ok_or(anyhow::anyhow!("safekeeper.control not found"))?; + filenames.remove(control_file_index); + filenames.insert(0, "safekeeper.control".to_string()); + + info!( + "Downloading {} files from safekeeper {}", + filenames.len(), + host + ); + + // Creating temp directory for a new timeline. It needs to be + // located on the same filesystem as the rest of the timelines. + + // conf.workdir is usually /storage/safekeeper/data + // will try to transform it into /storage/safekeeper/tmp + let temp_base = conf + .workdir + .parent() + .ok_or(anyhow::anyhow!("workdir has no parent"))? + .join("tmp"); + + tokio::fs::create_dir_all(&temp_base).await?; + + let tli_dir = tempfile::Builder::new() + .suffix("_temptli") + .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id)) + .tempdir_in(temp_base)?; + let tli_dir_path = tli_dir.path().to_owned(); + + // Note: some time happens between fetching list of files and fetching files themselves. + // It's possible that some files will be removed from safekeeper and we will fail to fetch them. + // This function will fail in this case, should be retried by the caller. + for filename in filenames { + let file_path = tli_dir_path.join(&filename); + // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename + let http_url = format!( + "{}/v1/tenant/{}/timeline/{}/file/{}", + host, status.tenant_id, status.timeline_id, filename + ); + + let mut file = tokio::fs::File::create(&file_path).await?; + let mut response = client.get(&http_url).send().await?; + while let Some(chunk) = response.chunk().await? { + file.write_all(&chunk).await?; + } + } + + // TODO: fsync? + + // Let's create timeline from temp directory and verify that it's correct + + let control_path = tli_dir_path.join("safekeeper.control"); + + let control_store = control_file::FileStorage::load_control_file(control_path)?; + if control_store.server.wal_seg_size == 0 { + bail!("wal_seg_size is not set"); + } + + let wal_store = + wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?; + + let commit_lsn = status.commit_lsn; + let flush_lsn = wal_store.flush_lsn(); + + info!( + "Finished downloading timeline {}, commit_lsn={}, flush_lsn={}", + ttid, commit_lsn, flush_lsn + ); + assert!(status.commit_lsn <= status.flush_lsn); + + // Move timeline dir to the correct location + let timeline_path = conf.timeline_dir(&ttid); + + info!( + "Moving timeline {} from {} to {}", + ttid, + tli_dir_path.display(), + timeline_path.display() + ); + tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?; + tokio::fs::rename(tli_dir_path, &timeline_path).await?; + + let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?; + + info!( + "Loaded timeline {}, flush_lsn={}", + ttid, + tli.get_flush_lsn() + ); + + Ok(Response { + safekeeper_host: host, + }) +} diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 64ca6967df..2dbf215998 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -129,7 +129,8 @@ impl SharedState { // We don't want to write anything to disk, because we may have existing timeline there. // These functions should not change anything on disk. let control_store = control_file::FileStorage::create_new(ttid, conf, state)?; - let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?; + let wal_store = + wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?; Ok(Self { @@ -149,7 +150,8 @@ impl SharedState { bail!(TimelineError::UninitializedWalSegSize(*ttid)); } - let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?; + let wal_store = + wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; Ok(Self { sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 868ee97645..41809794dc 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -159,6 +159,26 @@ impl GlobalTimelines { Ok(()) } + /// Load timeline from disk to the memory. + pub fn load_timeline(ttid: TenantTimelineId) -> Result> { + let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies(); + + match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) { + Ok(timeline) => { + let tli = Arc::new(timeline); + // TODO: prevent concurrent timeline creation/loading + TIMELINES_STATE + .lock() + .unwrap() + .timelines + .insert(ttid, tli.clone()); + Ok(tli) + } + // If we can't load a timeline, it's bad. Caller will figure it out. + Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e), + } + } + /// Get the number of timelines in the map. pub fn timelines_count() -> usize { TIMELINES_STATE.lock().unwrap().timelines.len() diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 54e27714ea..5ef22b2f6a 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -112,10 +112,10 @@ impl PhysicalStorage { /// the disk. Otherwise, all LSNs are set to zero. pub fn new( ttid: &TenantTimelineId, + timeline_dir: PathBuf, conf: &SafeKeeperConf, state: &SafeKeeperState, ) -> Result { - let timeline_dir = conf.timeline_dir(ttid); let wal_seg_size = state.server.wal_seg_size as usize; // Find out where stored WAL ends, starting at commit_lsn which is a diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index c71a063cb4..79b2e5b290 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1820,6 +1820,36 @@ class VanillaPostgres(PgProtocol): self.pg_bin.run_capture(["initdb", "-D", str(pgdatadir)]) self.configure([f"port = {port}\n"]) + def enable_tls(self): + assert not self.running + # generate self-signed certificate + subprocess.run( + [ + "openssl", + "req", + "-new", + "-x509", + "-days", + "365", + "-nodes", + "-text", + "-out", + self.pgdatadir / "server.crt", + "-keyout", + self.pgdatadir / "server.key", + "-subj", + "/CN=localhost", + ] + ) + # configure postgresql.conf + self.configure( + [ + "ssl = on", + "ssl_cert_file = 'server.crt'", + "ssl_key_file = 'server.key'", + ] + ) + def configure(self, options: List[str]): """Append lines into postgresql.conf file.""" assert not self.running @@ -1992,6 +2022,7 @@ class NeonProxy(PgProtocol): # Link auth backend params *["--auth-backend", "link"], *["--uri", NeonProxy.link_auth_uri], + *["--allow-self-signed-compute", "true"], ] @dataclass(frozen=True) @@ -2012,6 +2043,7 @@ class NeonProxy(PgProtocol): def __init__( self, neon_binpath: Path, + test_output_dir: Path, proxy_port: int, http_port: int, mgmt_port: int, @@ -2025,6 +2057,7 @@ class NeonProxy(PgProtocol): self.host = host self.http_port = http_port self.neon_binpath = neon_binpath + self.test_output_dir = test_output_dir self.proxy_port = proxy_port self.mgmt_port = mgmt_port self.auth_backend = auth_backend @@ -2051,7 +2084,8 @@ class NeonProxy(PgProtocol): *["--metric-collection-interval", self.metric_collection_interval], ] - self._popen = subprocess.Popen(args) + logfile = open(self.test_output_dir / "proxy.log", "w") + self._popen = subprocess.Popen(args, stdout=logfile, stderr=logfile) self._wait_until_ready() return self @@ -2108,7 +2142,7 @@ class NeonProxy(PgProtocol): try: self._popen.wait(timeout=5) except subprocess.TimeoutExpired: - log.warn("failed to gracefully terminate proxy; killing") + log.warning("failed to gracefully terminate proxy; killing") self._popen.kill() @staticmethod @@ -2119,6 +2153,7 @@ class NeonProxy(PgProtocol): if create_user: log.info("creating a new user for link auth test") + local_vanilla_pg.enable_tls() local_vanilla_pg.start() local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser") @@ -2152,7 +2187,9 @@ class NeonProxy(PgProtocol): @pytest.fixture(scope="function") -def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterator[NeonProxy]: +def link_proxy( + port_distributor: PortDistributor, neon_binpath: Path, test_output_dir: Path +) -> Iterator[NeonProxy]: """Neon proxy that routes through link auth.""" http_port = port_distributor.get_port() @@ -2161,6 +2198,7 @@ def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterato with NeonProxy( neon_binpath=neon_binpath, + test_output_dir=test_output_dir, proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, @@ -2172,7 +2210,10 @@ def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterato @pytest.fixture(scope="function") def static_proxy( - vanilla_pg: VanillaPostgres, port_distributor: PortDistributor, neon_binpath: Path + vanilla_pg: VanillaPostgres, + port_distributor: PortDistributor, + neon_binpath: Path, + test_output_dir: Path, ) -> Iterator[NeonProxy]: """Neon proxy that routes directly to vanilla postgres.""" @@ -2191,6 +2232,7 @@ def static_proxy( with NeonProxy( neon_binpath=neon_binpath, + test_output_dir=test_output_dir, proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, @@ -2619,6 +2661,13 @@ class SafekeeperHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]: + res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body) + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def timeline_create( self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn ): diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index ecbce1f8f7..df542fb84a 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -199,9 +199,12 @@ def proxy_metrics_handler(request: Request) -> Response: return Response(status=200) -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def proxy_with_metric_collector( - port_distributor: PortDistributor, neon_binpath: Path, httpserver_listen_address + port_distributor: PortDistributor, + neon_binpath: Path, + httpserver_listen_address, + test_output_dir: Path, ) -> Iterator[NeonProxy]: """Neon proxy that routes through link auth and has metric collection enabled.""" @@ -215,6 +218,7 @@ def proxy_with_metric_collector( with NeonProxy( neon_binpath=neon_binpath, + test_output_dir=test_output_dir, proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, diff --git a/test_runner/regress/test_sni_router.py b/test_runner/regress/test_sni_router.py new file mode 100644 index 0000000000..64cfd017e6 --- /dev/null +++ b/test_runner/regress/test_sni_router.py @@ -0,0 +1,134 @@ +import socket +import subprocess +from pathlib import Path +from types import TracebackType +from typing import Optional, Type + +import backoff # type: ignore +from fixtures.log_helper import log +from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres + + +def generate_tls_cert(cn, certout, keyout): + subprocess.run( + [ + "openssl", + "req", + "-new", + "-x509", + "-days", + "365", + "-nodes", + "-out", + certout, + "-keyout", + keyout, + "-subj", + f"/CN={cn}", + ] + ) + + +class PgSniRouter(PgProtocol): + def __init__( + self, + neon_binpath: Path, + port: int, + destination: str, + tls_cert: Path, + tls_key: Path, + ): + # Must use a hostname rather than IP here, for SNI to work + host = "localhost" + super().__init__(host=host, port=port) + + self.host = host + self.neon_binpath = neon_binpath + self.port = port + self.destination = destination + self.tls_cert = tls_cert + self.tls_key = tls_key + self._popen: Optional[subprocess.Popen[bytes]] = None + + def start(self) -> "PgSniRouter": + assert self._popen is None + args = [ + str(self.neon_binpath / "pg_sni_router"), + *["--listen", f"127.0.0.1:{self.port}"], + *["--tls-cert", str(self.tls_cert)], + *["--tls-key", str(self.tls_key)], + *["--destination", self.destination], + ] + + self._popen = subprocess.Popen(args) + self._wait_until_ready() + return self + + @backoff.on_exception(backoff.expo, OSError, max_time=10) + def _wait_until_ready(self): + socket.create_connection((self.host, self.port)) + + # Sends SIGTERM to the proxy if it has been started + def terminate(self): + if self._popen: + self._popen.terminate() + + # Waits for proxy to exit if it has been opened with a default timeout of + # two seconds. Raises subprocess.TimeoutExpired if the proxy does not exit in time. + def wait_for_exit(self, timeout=2): + if self._popen: + self._popen.wait(timeout=2) + + def __enter__(self) -> "PgSniRouter": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ): + if self._popen is not None: + self._popen.terminate() + try: + self._popen.wait(timeout=5) + except subprocess.TimeoutExpired: + log.warning("failed to gracefully terminate pg_sni_router; killing") + self._popen.kill() + + +def test_pg_sni_router( + vanilla_pg: VanillaPostgres, + port_distributor: PortDistributor, + neon_binpath: Path, + test_output_dir: Path, +): + generate_tls_cert( + "endpoint.namespace.localtest.me", + test_output_dir / "router.crt", + test_output_dir / "router.key", + ) + + # Start a stand-alone Postgres to test with + vanilla_pg.start() + pg_port = vanilla_pg.default_options["port"] + + router_port = port_distributor.get_port() + + with PgSniRouter( + neon_binpath=neon_binpath, + port=router_port, + destination="localtest.me", + tls_cert=test_output_dir / "router.crt", + tls_key=test_output_dir / "router.key", + ) as router: + router.start() + + out = router.safe_psql( + "select 1", + dbname="postgres", + sslmode="require", + host=f"endpoint--namespace--{pg_port}.localtest.me", + hostaddr="127.0.0.1", + ) + assert out[0][0] == 1 diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index e8cfa4f318..fed5f325ca 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1254,3 +1254,98 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): with closing(endpoint_other.connect()) as conn: with conn.cursor() as cur: cur.execute("INSERT INTO t (key) VALUES (123)") + + +def test_pull_timeline(neon_env_builder: NeonEnvBuilder): + def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str: + return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names]) + + def execute_payload(endpoint: Endpoint): + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)") + cur.execute("INSERT INTO t VALUES (0, 'something')") + sum_before = query_scalar(cur, "SELECT SUM(key) FROM t") + + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + sum_after = query_scalar(cur, "SELECT SUM(key) FROM t") + assert sum_after == sum_before + 5000050000 + + def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId): + for sk in safekeepers: + http_cli = sk.http_client() + try: + status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"Safekeeper {sk.id} status: {status}") + except Exception as e: + log.info(f"Safekeeper {sk.id} status error: {e}") + + neon_env_builder.num_safekeepers = 4 + env = neon_env_builder.init_start() + env.neon_cli.create_branch("test_pull_timeline") + + log.info("Use only first 3 safekeepers") + env.safekeepers[3].stop() + active_safekeepers = [1, 2, 3] + endpoint = env.endpoints.create("test_pull_timeline") + endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.start() + + # learn neon timeline from compute + tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Kill safekeeper 2, continue with payload") + env.safekeepers[1].stop(immediate=True) + execute_payload(endpoint) + + log.info("Initialize new safekeeper 4, pull data from 1 & 3") + env.safekeepers[3].start() + + res = ( + env.safekeepers[3] + .http_client() + .pull_timeline( + { + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "http_hosts": [ + f"http://localhost:{env.safekeepers[0].port.http}", + f"http://localhost:{env.safekeepers[2].port.http}", + ], + } + ) + ) + log.info("Finished pulling timeline") + log.info(res) + + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Restarting compute with new config to verify that it works") + active_safekeepers = [1, 3, 4] + + endpoint.stop_and_destroy().create("test_pull_timeline") + endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.start() + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Stop sk1 (simulate failure) and use only quorum of sk3 and sk4") + env.safekeepers[0].stop(immediate=True) + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Restart sk4 and and use quorum of sk1 and sk4") + env.safekeepers[3].stop() + env.safekeepers[2].stop() + env.safekeepers[0].start() + env.safekeepers[3].start() + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id)