mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 20:10:38 +00:00
Compare commits
1 Commits
fix-datadi
...
main-broke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9fd9a15305 |
@@ -3,7 +3,7 @@ storage:
|
||||
bucket_name: neon-prod-storage-ap-southeast-1
|
||||
bucket_region: ap-southeast-1
|
||||
console_mgmt_base_url: http://console-release.local
|
||||
broker_endpoint: http://storage-broker-lb.epsilon.ap-southeast-1.internal.aws.neon.tech:50051
|
||||
broker_endpoint: https://storage-broker.epsilon.ap-southeast-1.internal.aws.neon.tech:443
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
remote_storage:
|
||||
|
||||
2
.github/ansible/prod.eu-central-1.hosts.yaml
vendored
2
.github/ansible/prod.eu-central-1.hosts.yaml
vendored
@@ -3,7 +3,7 @@ storage:
|
||||
bucket_name: neon-prod-storage-eu-central-1
|
||||
bucket_region: eu-central-1
|
||||
console_mgmt_base_url: http://console-release.local
|
||||
broker_endpoint: http://storage-broker-lb.gamma.eu-central-1.internal.aws.neon.tech:50051
|
||||
broker_endpoint: https://storage-broker.gamma.eu-central-1.internal.aws.neon.tech:443
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
remote_storage:
|
||||
|
||||
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
@@ -3,7 +3,7 @@ storage:
|
||||
bucket_name: neon-prod-storage-us-east-2
|
||||
bucket_region: us-east-2
|
||||
console_mgmt_base_url: http://console-release.local
|
||||
broker_endpoint: http://storage-broker-lb.delta.us-east-2.internal.aws.neon.tech:50051
|
||||
broker_endpoint: https://storage-broker.delta.us-east-2.internal.aws.neon.tech:443
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
remote_storage:
|
||||
|
||||
2
.github/ansible/prod.us-west-2.hosts.yaml
vendored
2
.github/ansible/prod.us-west-2.hosts.yaml
vendored
@@ -3,7 +3,7 @@ storage:
|
||||
bucket_name: neon-prod-storage-us-west-2
|
||||
bucket_region: us-west-2
|
||||
console_mgmt_base_url: http://console-release.local
|
||||
broker_endpoint: http://storage-broker-lb.eta.us-west-2.internal.aws.neon.tech:50051
|
||||
broker_endpoint: https://storage-broker.eta.us-west-2.internal.aws.neon.tech:443
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
remote_storage:
|
||||
|
||||
2
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
2
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
@@ -3,7 +3,7 @@ storage:
|
||||
bucket_name: neon-dev-storage-eu-west-1
|
||||
bucket_region: eu-west-1
|
||||
console_mgmt_base_url: http://console-staging.local
|
||||
broker_endpoint: http://storage-broker-lb.zeta.eu-west-1.internal.aws.neon.build:50051
|
||||
broker_endpoint: https://storage-broker.zeta.eu-west-1.internal.aws.neon.build:443
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
remote_storage:
|
||||
|
||||
2
.github/ansible/staging.us-east-2.hosts.yaml
vendored
2
.github/ansible/staging.us-east-2.hosts.yaml
vendored
@@ -3,7 +3,7 @@ storage:
|
||||
bucket_name: neon-staging-storage-us-east-2
|
||||
bucket_region: us-east-2
|
||||
console_mgmt_base_url: http://console-staging.local
|
||||
broker_endpoint: http://storage-broker-lb.beta.us-east-2.internal.aws.neon.build:50051
|
||||
broker_endpoint: https://storage-broker.beta.us-east-2.internal.aws.neon.build:443
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
remote_storage:
|
||||
|
||||
@@ -3,22 +3,27 @@ podLabels:
|
||||
neon_env: staging
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.zeta.eu-west-1.internal.aws.neon.build
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
enabled: true
|
||||
annotations:
|
||||
kubernetes.io/ingress.class: nginx-internal
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
|
||||
nginx.ingress.kubernetes.io/ssl-redirect: "true"
|
||||
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
|
||||
# we have basically infinite streams, disable body size limit
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: "0"
|
||||
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
|
||||
|
||||
hosts:
|
||||
- host: storage-broker.zeta.eu-west-1.internal.aws.neon.build
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- hosts:
|
||||
- storage-broker.zeta.eu-west-1.internal.aws.neon.build
|
||||
secretName: storage-broker-tls
|
||||
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
@@ -3,22 +3,27 @@ podLabels:
|
||||
neon_env: staging
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.beta.us-east-2.internal.aws.neon.build
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
enabled: true
|
||||
annotations:
|
||||
kubernetes.io/ingress.class: nginx-internal
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
|
||||
nginx.ingress.kubernetes.io/ssl-redirect: "true"
|
||||
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
|
||||
# we have basically infinite streams, disable body size limit
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: "0"
|
||||
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
|
||||
|
||||
hosts:
|
||||
- host: storage-broker.beta.us-east-2.internal.aws.neon.build
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- hosts:
|
||||
- storage-broker.beta.us-east-2.internal.aws.neon.build
|
||||
secretName: storage-broker-tls
|
||||
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
@@ -3,22 +3,27 @@ podLabels:
|
||||
neon_env: production
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.epsilon.ap-southeast-1.internal.aws.neon.tech
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
enabled: true
|
||||
annotations:
|
||||
kubernetes.io/ingress.class: nginx-internal
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
|
||||
nginx.ingress.kubernetes.io/ssl-redirect: "true"
|
||||
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
|
||||
# we have basically infinite streams, disable body size limit
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: "0"
|
||||
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
|
||||
|
||||
hosts:
|
||||
- host: storage-broker.epsilon.ap-southeast-1.internal.aws.neon.tech
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- hosts:
|
||||
- storage-broker.epsilon.ap-southeast-1.internal.aws.neon.tech
|
||||
secretName: storage-broker-tls
|
||||
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
@@ -3,22 +3,27 @@ podLabels:
|
||||
neon_env: production
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.gamma.eu-central-1.internal.aws.neon.tech
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
enabled: true
|
||||
annotations:
|
||||
kubernetes.io/ingress.class: nginx-internal
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
|
||||
nginx.ingress.kubernetes.io/ssl-redirect: "true"
|
||||
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
|
||||
# we have basically infinite streams, disable body size limit
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: "0"
|
||||
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
|
||||
|
||||
hosts:
|
||||
- host: storage-broker.gamma.eu-central-1.internal.aws.neon.tech
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- hosts:
|
||||
- storage-broker.gamma.eu-central-1.internal.aws.neon.tech
|
||||
secretName: storage-broker-tls
|
||||
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
@@ -3,22 +3,27 @@ podLabels:
|
||||
neon_env: production
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.delta.us-east-2.internal.aws.neon.tech
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
enabled: true
|
||||
annotations:
|
||||
kubernetes.io/ingress.class: nginx-internal
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
|
||||
nginx.ingress.kubernetes.io/ssl-redirect: "true"
|
||||
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
|
||||
# we have basically infinite streams, disable body size limit
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: "0"
|
||||
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
|
||||
|
||||
hosts:
|
||||
- host: storage-broker.delta.us-east-2.internal.aws.neon.tech
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- hosts:
|
||||
- storage-broker.delta.us-east-2.internal.aws.neon.tech
|
||||
secretName: storage-broker-tls
|
||||
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
@@ -3,22 +3,27 @@ podLabels:
|
||||
neon_env: production
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.eta.us-west-2.internal.aws.neon.tech
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
enabled: true
|
||||
annotations:
|
||||
kubernetes.io/ingress.class: nginx-internal
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
|
||||
nginx.ingress.kubernetes.io/ssl-redirect: "true"
|
||||
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
|
||||
# we have basically infinite streams, disable body size limit
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: "0"
|
||||
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
|
||||
|
||||
hosts:
|
||||
- host: storage-broker.eta.us-west-2.internal.aws.neon.tech
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- hosts:
|
||||
- storage-broker.eta.us-west-2.internal.aws.neon.tech
|
||||
secretName: storage-broker-tls
|
||||
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
6
.github/workflows/benchmarking.yml
vendored
6
.github/workflows/benchmarking.yml
vendored
@@ -18,7 +18,6 @@ on:
|
||||
region_id:
|
||||
description: 'Use a particular region. If not set the default region will be used'
|
||||
required: false
|
||||
default: 'aws-us-east-2'
|
||||
save_perf_report:
|
||||
type: boolean
|
||||
description: 'Publish perf report or not. If not set, the report is published only for the main branch'
|
||||
@@ -116,10 +115,13 @@ jobs:
|
||||
# neon-captest-prefetch: Same, with prefetching enabled (new project)
|
||||
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
|
||||
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
|
||||
platform: [ neon-captest-reuse, neon-captest-prefetch, rds-postgres ]
|
||||
platform: [ neon-captest-new, neon-captest-reuse, neon-captest-prefetch, rds-postgres ]
|
||||
db_size: [ 10gb ]
|
||||
runner: [ us-east-2 ]
|
||||
include:
|
||||
- platform: neon-captest-new
|
||||
db_size: 50gb
|
||||
runner: us-east-2
|
||||
- platform: neon-captest-prefetch
|
||||
db_size: 50gb
|
||||
runner: us-east-2
|
||||
|
||||
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -1072,7 +1072,7 @@ jobs:
|
||||
|
||||
- name: Deploy storage-broker
|
||||
run:
|
||||
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
|
||||
helm upgrade neon-storage-broker neondatabase/neon-storage-broker --namespace neon-storage-broker --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
|
||||
|
||||
deploy-proxy-prod-new:
|
||||
runs-on: prod
|
||||
@@ -1149,7 +1149,7 @@ jobs:
|
||||
|
||||
- name: Deploy storage-broker
|
||||
run:
|
||||
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
|
||||
helm upgrade neon-storage-broker neondatabase/neon-storage-broker --namespace neon-storage-broker --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
|
||||
|
||||
promote-compatibility-data:
|
||||
runs-on: [ self-hosted, dev, x64 ]
|
||||
|
||||
53
Cargo.lock
generated
53
Cargo.lock
generated
@@ -563,12 +563,6 @@ version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
|
||||
|
||||
[[package]]
|
||||
name = "bincode"
|
||||
version = "1.3.3"
|
||||
@@ -1926,7 +1920,7 @@ version = "8.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09f4f04699947111ec1733e71778d763555737579e44b85844cae8e1940a1828"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
"pem",
|
||||
"ring",
|
||||
"serde",
|
||||
@@ -2513,7 +2507,7 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "03c64931a1a212348ec4f3b4362585eca7159d0d09cbdf4a7f74f02173596fd4"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2534,18 +2528,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "phf"
|
||||
version = "0.11.1"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
|
||||
checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259"
|
||||
dependencies = [
|
||||
"phf_shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "phf_shared"
|
||||
version = "0.11.1"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676"
|
||||
checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096"
|
||||
dependencies = [
|
||||
"siphasher",
|
||||
]
|
||||
@@ -2618,12 +2612,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
|
||||
version = "0.19.2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=d052ee8b86fff9897c77b0fe89ea9daba0e1fa38#d052ee8b86fff9897c77b0fe89ea9daba0e1fa38"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"futures-util",
|
||||
"futures",
|
||||
"log",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
@@ -2632,9 +2626,9 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=d052ee8b86fff9897c77b0fe89ea9daba0e1fa38#d052ee8b86fff9897c77b0fe89ea9daba0e1fa38"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"base64",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -2649,8 +2643,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
|
||||
version = "0.2.3"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=d052ee8b86fff9897c77b0fe89ea9daba0e1fa38#d052ee8b86fff9897c77b0fe89ea9daba0e1fa38"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -2874,7 +2868,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"atty",
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
"bstr",
|
||||
"bytes",
|
||||
"clap 4.0.29",
|
||||
@@ -3084,7 +3078,7 @@ version = "0.11.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
"bytes",
|
||||
"encoding_rs",
|
||||
"futures-core",
|
||||
@@ -3267,7 +3261,7 @@ version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3548,7 +3542,7 @@ version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "25bf4a5a814902cd1014dbccfa4d4560fb8432c779471e96e035602519f82eef"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
"chrono",
|
||||
"hex",
|
||||
"indexmap",
|
||||
@@ -4015,15 +4009,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
|
||||
version = "0.7.6"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=d052ee8b86fff9897c77b0fe89ea9daba0e1fa38#d052ee8b86fff9897c77b0fe89ea9daba0e1fa38"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"futures",
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"percent-encoding",
|
||||
@@ -4116,7 +4109,7 @@ dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
@@ -4358,7 +4351,7 @@ version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b97acb4c28a254fd7a4aeec976c46a7fa404eac4d7c134b30c75144846d7cb8f"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
"chunked_transfer",
|
||||
"log",
|
||||
"native-tls",
|
||||
@@ -4794,7 +4787,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0ecbeb7b67ce215e40e3cc7f2ff902f94a223acf44995934763467e7b1febc8"
|
||||
dependencies = [
|
||||
"asn1-rs",
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
"data-encoding",
|
||||
"der-parser",
|
||||
"lazy_static",
|
||||
|
||||
@@ -86,4 +86,4 @@ lto = true
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
[patch.crates-io]
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
|
||||
203
Makefile
203
Makefile
@@ -61,115 +61,146 @@ all: neon postgres neon-pg-ext
|
||||
#
|
||||
# The 'postgres_ffi' depends on the Postgres headers.
|
||||
.PHONY: neon
|
||||
neon: postgres-headers
|
||||
neon: postgres-v14-headers postgres-v15-headers
|
||||
+@echo "Compiling Neon"
|
||||
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS)
|
||||
|
||||
### PostgreSQL parts
|
||||
# Some rules are duplicated for Postgres v14 and 15. We may want to refactor
|
||||
# The rules are duplicated for Postgres v14 and 15. We may want to refactor
|
||||
# to avoid the duplication in the future, but it's tolerable for now.
|
||||
#
|
||||
$(POSTGRES_INSTALL_DIR)/build/%/config.status:
|
||||
+@echo "Configuring Postgres $* build"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/$*
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/$* && \
|
||||
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure \
|
||||
$(POSTGRES_INSTALL_DIR)/build/v14/config.status:
|
||||
+@echo "Configuring Postgres v14 build"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/v14
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/v14 && \
|
||||
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-v14/configure \
|
||||
CFLAGS='$(PG_CFLAGS)' \
|
||||
$(PG_CONFIGURE_OPTS) \
|
||||
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/$* > configure.log)
|
||||
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/v14 > configure.log)
|
||||
|
||||
$(POSTGRES_INSTALL_DIR)/build/v15/config.status:
|
||||
+@echo "Configuring Postgres v15 build"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/v15
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/v15 && \
|
||||
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-v15/configure \
|
||||
CFLAGS='$(PG_CFLAGS)' \
|
||||
$(PG_CONFIGURE_OPTS) \
|
||||
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/v15 > configure.log)
|
||||
|
||||
# nicer alias to run 'configure'
|
||||
# Note: I've been unable to use templates for this part of our configuration.
|
||||
# I'm not sure why it wouldn't work, but this is the only place (apart from
|
||||
# the "build-all-versions" entry points) where direct mention of PostgreSQL
|
||||
# versions is used.
|
||||
.PHONY: postgres-configure-v15
|
||||
postgres-configure-v15: $(POSTGRES_INSTALL_DIR)/build/v15/config.status
|
||||
.PHONY: postgres-configure-v14
|
||||
postgres-configure-v14: $(POSTGRES_INSTALL_DIR)/build/v14/config.status
|
||||
.PHONY: postgres-v14-configure
|
||||
postgres-v14-configure: $(POSTGRES_INSTALL_DIR)/build/v14/config.status
|
||||
|
||||
.PHONY: postgres-v15-configure
|
||||
postgres-v15-configure: $(POSTGRES_INSTALL_DIR)/build/v15/config.status
|
||||
|
||||
# Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)/<version>/include
|
||||
.PHONY: postgres-headers-%
|
||||
postgres-headers-%: postgres-configure-%
|
||||
+@echo "Installing PostgreSQL $* headers"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/include MAKELEVEL=0 install
|
||||
.PHONY: postgres-v14-headers
|
||||
postgres-v14-headers: postgres-v14-configure
|
||||
+@echo "Installing PostgreSQL v14 headers"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/src/include MAKELEVEL=0 install
|
||||
|
||||
.PHONY: postgres-v15-headers
|
||||
postgres-v15-headers: postgres-v15-configure
|
||||
+@echo "Installing PostgreSQL v15 headers"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/src/include MAKELEVEL=0 install
|
||||
|
||||
# Compile and install PostgreSQL
|
||||
.PHONY: postgres-%
|
||||
postgres-%: postgres-configure-% \
|
||||
postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers`
|
||||
+@echo "Compiling PostgreSQL $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 install
|
||||
+@echo "Compiling libpq $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq install
|
||||
+@echo "Compiling pg_prewarm $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_prewarm install
|
||||
+@echo "Compiling pg_buffercache $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache install
|
||||
+@echo "Compiling pageinspect $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
|
||||
.PHONY: postgres-v14
|
||||
postgres-v14: postgres-v14-configure \
|
||||
postgres-v14-headers # to prevent `make install` conflicts with neon's `postgres-headers`
|
||||
+@echo "Compiling PostgreSQL v14"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14 MAKELEVEL=0 install
|
||||
+@echo "Compiling libpq v14"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/src/interfaces/libpq install
|
||||
+@echo "Compiling pg_prewarm v14"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_prewarm install
|
||||
+@echo "Compiling pg_buffercache v14"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_buffercache install
|
||||
+@echo "Compiling pageinspect v14"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pageinspect install
|
||||
|
||||
.PHONY: postgres-clean-%
|
||||
postgres-clean-%:
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq clean
|
||||
|
||||
.PHONY: neon-pg-ext-%
|
||||
neon-pg-ext-%: postgres-%
|
||||
+@echo "Compiling neon $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install
|
||||
+@echo "Compiling neon_walredo $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install
|
||||
+@echo "Compiling neon_test_utils $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install
|
||||
|
||||
.PHONY: neon-pg-ext-clean-%
|
||||
neon-pg-ext-clean-%:
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/pgxn/neon-$* -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/pgxn/neon_walredo-$* -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/pgxn/neon_test_utils-$* -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile clean
|
||||
|
||||
.PHONY: neon-pg-ext
|
||||
neon-pg-ext: \
|
||||
neon-pg-ext-v14 \
|
||||
neon-pg-ext-v15
|
||||
|
||||
.PHONY: neon-pg-ext-clean
|
||||
neon-pg-ext-clean: \
|
||||
neon-pg-ext-clean-v14 \
|
||||
neon-pg-ext-clean-v15
|
||||
.PHONY: postgres-v15
|
||||
postgres-v15: postgres-v15-configure \
|
||||
postgres-v15-headers # to prevent `make install` conflicts with neon's `postgres-headers`
|
||||
+@echo "Compiling PostgreSQL v15"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15 MAKELEVEL=0 install
|
||||
+@echo "Compiling libpq v15"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/src/interfaces/libpq install
|
||||
+@echo "Compiling pg_prewarm v15"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pg_prewarm install
|
||||
+@echo "Compiling pg_buffercache v15"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pg_buffercache install
|
||||
+@echo "Compiling pageinspect v15"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pageinspect install
|
||||
|
||||
# shorthand to build all Postgres versions
|
||||
.PHONY: postgres
|
||||
postgres: \
|
||||
postgres-v14 \
|
||||
postgres-v15
|
||||
postgres: postgres-v14 postgres-v15
|
||||
|
||||
.PHONY: postgres-headers
|
||||
postgres-headers: \
|
||||
postgres-headers-v14 \
|
||||
postgres-headers-v15
|
||||
.PHONY: postgres-v14-clean
|
||||
postgres-v14-clean:
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14 MAKELEVEL=0 clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_buffercache clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pageinspect clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/src/interfaces/libpq clean
|
||||
|
||||
.PHONY: postgres-clean
|
||||
postgres-clean: \
|
||||
postgres-clean-v14 \
|
||||
postgres-clean-v15
|
||||
.PHONY: postgres-v15-clean
|
||||
postgres-v15-clean:
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15 MAKELEVEL=0 clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pg_buffercache clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pageinspect clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/src/interfaces/libpq clean
|
||||
|
||||
neon-pg-ext-v14: postgres-v14
|
||||
+@echo "Compiling neon v14"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-v14
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-v14 && \
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install)
|
||||
+@echo "Compiling neon_walredo v14"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v14
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v14 && \
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install)
|
||||
+@echo "Compiling neon_test_utils" v14
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v14
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v14 && \
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install)
|
||||
|
||||
neon-pg-ext-v15: postgres-v15
|
||||
+@echo "Compiling neon v15"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-v15
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-v15 && \
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install)
|
||||
+@echo "Compiling neon_walredo v15"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v15
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v15 && \
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install)
|
||||
+@echo "Compiling neon_test_utils" v15
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v15
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v15 && \
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install)
|
||||
|
||||
.PHONY: neon-pg-ext-clean
|
||||
$(MAKE) -C $(ROOT_PROJECT_DIR)/pgxn/neon clean
|
||||
$(MAKE) -C $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils clean
|
||||
|
||||
neon-pg-ext: neon-pg-ext-v14 neon-pg-ext-v15
|
||||
postgres-headers: postgres-v14-headers postgres-v15-headers
|
||||
postgres-clean: postgres-v14-clean postgres-v15-clean
|
||||
|
||||
# This doesn't remove the effects of 'configure'.
|
||||
.PHONY: clean
|
||||
clean: postgres-clean neon-pg-ext-clean
|
||||
clean:
|
||||
cd $(POSTGRES_INSTALL_DIR)/build/v14 && $(MAKE) clean
|
||||
cd $(POSTGRES_INSTALL_DIR)/build/v15 && $(MAKE) clean
|
||||
$(CARGO_CMD_PREFIX) cargo clean
|
||||
cd pgxn/neon && $(MAKE) clean
|
||||
cd pgxn/neon_test_utils && $(MAKE) clean
|
||||
|
||||
# This removes everything
|
||||
.PHONY: distclean
|
||||
|
||||
@@ -12,12 +12,12 @@ futures = "0.3.13"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
log = { version = "0.4", features = ["std", "serde"] }
|
||||
notify = "5.0.0"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
regex = "1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tar = "0.4"
|
||||
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
url = "2.2.2"
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -10,7 +10,7 @@ comfy-table = "6.1"
|
||||
git-version = "0.3.5"
|
||||
nix = "0.25"
|
||||
once_cell = "1.13.0"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
regex = "1"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
||||
@@ -203,6 +203,29 @@ pub struct TimelineInfo {
|
||||
pub pg_version: u32,
|
||||
|
||||
pub state: TimelineState,
|
||||
|
||||
// Some of the above fields are duplicated in 'local' and 'remote', for backwards-
|
||||
// compatility with older clients.
|
||||
pub local: LocalTimelineInfo,
|
||||
pub remote: RemoteTimelineInfo,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct LocalTimelineInfo {
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub ancestor_timeline_id: Option<TimelineId>,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub ancestor_lsn: Option<Lsn>,
|
||||
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct RemoteTimelineInfo {
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub remote_consistent_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
|
||||
@@ -8,8 +8,8 @@ edition = "2021"
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
itertools = "0.10.3"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
url = "2.2.2"
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.9"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
wal_craft = { path = "wal_craft" }
|
||||
|
||||
[build-dependencies]
|
||||
|
||||
@@ -11,7 +11,7 @@ clap = "4.0"
|
||||
env_logger = "0.9"
|
||||
log = "0.4"
|
||||
once_cell = "1.13.0"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
postgres_ffi = { path = "../" }
|
||||
tempfile = "3.2"
|
||||
workspace_hack = { version = "0.1", path = "../../../workspace_hack" }
|
||||
|
||||
@@ -7,7 +7,7 @@ edition = "2021"
|
||||
anyhow = "1.0"
|
||||
bytes = "1.0.1"
|
||||
pin-project-lite = "0.2.7"
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
rand = "0.8.3"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
|
||||
@@ -463,10 +463,7 @@ pub enum BeMessage<'a> {
|
||||
EncryptionResponse(bool),
|
||||
NoData,
|
||||
ParameterDescription,
|
||||
ParameterStatus {
|
||||
name: &'a [u8],
|
||||
value: &'a [u8],
|
||||
},
|
||||
ParameterStatus(BeParameterStatusMessage<'a>),
|
||||
ParseComplete,
|
||||
ReadyForQuery,
|
||||
RowDescription(&'a [RowDescriptor<'a>]),
|
||||
@@ -475,28 +472,6 @@ pub enum BeMessage<'a> {
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
}
|
||||
|
||||
/// Common shorthands.
|
||||
impl<'a> BeMessage<'a> {
|
||||
/// A [`BeMessage::ParameterStatus`] holding the client encoding, i.e. UTF-8.
|
||||
/// This is a sensible default, given that:
|
||||
/// * rust strings only support this encoding out of the box.
|
||||
/// * tokio-postgres, postgres-jdbc (and probably more) mandate it.
|
||||
///
|
||||
/// TODO: do we need to report `server_encoding` as well?
|
||||
pub const CLIENT_ENCODING: Self = Self::ParameterStatus {
|
||||
name: b"client_encoding",
|
||||
value: b"UTF8",
|
||||
};
|
||||
|
||||
/// Build a [`BeMessage::ParameterStatus`] holding the server version.
|
||||
pub fn server_version(version: &'a str) -> Self {
|
||||
Self::ParameterStatus {
|
||||
name: b"server_version",
|
||||
value: version.as_bytes(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum BeAuthenticationSaslMessage<'a> {
|
||||
Methods(&'a [&'a str]),
|
||||
@@ -510,6 +485,12 @@ pub enum BeParameterStatusMessage<'a> {
|
||||
ServerVersion(&'a str),
|
||||
}
|
||||
|
||||
impl BeParameterStatusMessage<'static> {
|
||||
pub fn encoding() -> BeMessage<'static> {
|
||||
BeMessage::ParameterStatus(Self::Encoding("UTF8"))
|
||||
}
|
||||
}
|
||||
|
||||
// One row description in RowDescription packet.
|
||||
#[derive(Debug)]
|
||||
pub struct RowDescriptor<'a> {
|
||||
@@ -606,15 +587,14 @@ fn write_body<R>(buf: &mut BytesMut, f: impl FnOnce(&mut BytesMut) -> R) -> R {
|
||||
}
|
||||
|
||||
/// Safe write of s into buf as cstring (String in the protocol).
|
||||
fn write_cstr(s: impl AsRef<[u8]>, buf: &mut BytesMut) -> Result<(), io::Error> {
|
||||
let bytes = s.as_ref();
|
||||
if bytes.contains(&0) {
|
||||
fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
|
||||
if s.contains(&0) {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"string contains embedded null",
|
||||
));
|
||||
}
|
||||
buf.put_slice(bytes);
|
||||
buf.put_slice(s);
|
||||
buf.put_u8(0);
|
||||
Ok(())
|
||||
}
|
||||
@@ -664,7 +644,7 @@ impl<'a> BeMessage<'a> {
|
||||
Methods(methods) => {
|
||||
buf.put_i32(10); // Specifies that SASL auth method is used.
|
||||
for method in methods.iter() {
|
||||
write_cstr(method, buf)?;
|
||||
write_cstr(method.as_bytes(), buf)?;
|
||||
}
|
||||
buf.put_u8(0); // zero terminator for the list
|
||||
}
|
||||
@@ -779,7 +759,7 @@ impl<'a> BeMessage<'a> {
|
||||
buf.put_slice(b"CXX000\0");
|
||||
|
||||
buf.put_u8(b'M'); // the message
|
||||
write_cstr(error_msg, buf)?;
|
||||
write_cstr(error_msg.as_bytes(), buf)?;
|
||||
|
||||
buf.put_u8(0); // terminator
|
||||
Ok::<_, io::Error>(())
|
||||
@@ -819,12 +799,24 @@ impl<'a> BeMessage<'a> {
|
||||
buf.put_u8(response);
|
||||
}
|
||||
|
||||
BeMessage::ParameterStatus { name, value } => {
|
||||
BeMessage::ParameterStatus(param) => {
|
||||
use std::io::{IoSlice, Write};
|
||||
use BeParameterStatusMessage::*;
|
||||
|
||||
let [name, value] = match param {
|
||||
Encoding(name) => [b"client_encoding", name.as_bytes()],
|
||||
ServerVersion(version) => [b"server_version", version.as_bytes()],
|
||||
};
|
||||
|
||||
// Parameter names and values are passed as null-terminated strings
|
||||
let iov = &mut [name, b"\0", value, b"\0"].map(IoSlice::new);
|
||||
let mut buffer = [0u8; 64]; // this should be enough
|
||||
let cnt = buffer.as_mut().write_vectored(iov).unwrap();
|
||||
|
||||
buf.put_u8(b'S');
|
||||
write_body(buf, |buf| {
|
||||
write_cstr(name, buf)?;
|
||||
write_cstr(value, buf)
|
||||
})?;
|
||||
buf.put_slice(&buffer[..cnt]);
|
||||
});
|
||||
}
|
||||
|
||||
BeMessage::ParameterDescription => {
|
||||
|
||||
@@ -4,13 +4,14 @@
|
||||
//! allowing multiple api users to independently work with the same S3 bucket, if
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::env::var;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::{
|
||||
environment::credentials::EnvironmentVariableCredentialsProvider,
|
||||
imds::credentials::ImdsCredentialsProvider,
|
||||
meta::credentials::{CredentialsProviderChain, LazyCachingCredentialsProvider},
|
||||
environment::credentials::EnvironmentVariableCredentialsProvider, imds,
|
||||
imds::credentials::ImdsCredentialsProvider, meta::credentials::provide_credentials_fn,
|
||||
};
|
||||
use aws_sdk_s3::{
|
||||
config::Config,
|
||||
@@ -19,6 +20,7 @@ use aws_sdk_s3::{
|
||||
Client, Endpoint, Region,
|
||||
};
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
use aws_types::credentials::{CredentialsError, ProvideCredentials};
|
||||
use hyper::Body;
|
||||
use tokio::{io, sync::Semaphore};
|
||||
use tokio_util::io::ReaderStream;
|
||||
@@ -29,6 +31,8 @@ use crate::{
|
||||
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
const DEFAULT_IMDS_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
pub(super) mod metrics {
|
||||
use metrics::{register_int_counter_vec, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -118,23 +122,30 @@ impl S3Bucket {
|
||||
"Creating s3 remote storage for S3 bucket {}",
|
||||
aws_config.bucket_name
|
||||
);
|
||||
|
||||
let credentials_provider = {
|
||||
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
|
||||
let env_creds = EnvironmentVariableCredentialsProvider::new();
|
||||
// uses imds v2
|
||||
let imds = ImdsCredentialsProvider::builder().build();
|
||||
|
||||
// finally add caching.
|
||||
// this might change in future, see https://github.com/awslabs/aws-sdk-rust/issues/629
|
||||
LazyCachingCredentialsProvider::builder()
|
||||
.load(CredentialsProviderChain::first_try("env", env_creds).or_else("imds", imds))
|
||||
.build()
|
||||
};
|
||||
|
||||
let mut config_builder = Config::builder()
|
||||
.region(Region::new(aws_config.bucket_region.clone()))
|
||||
.credentials_provider(credentials_provider);
|
||||
.credentials_provider(provide_credentials_fn(|| async {
|
||||
match var("AWS_ACCESS_KEY_ID").is_ok() && var("AWS_SECRET_ACCESS_KEY").is_ok() {
|
||||
true => {
|
||||
EnvironmentVariableCredentialsProvider::new()
|
||||
.provide_credentials()
|
||||
.await
|
||||
}
|
||||
false => {
|
||||
let imds_client = imds::Client::builder()
|
||||
.connect_timeout(DEFAULT_IMDS_TIMEOUT)
|
||||
.read_timeout(DEFAULT_IMDS_TIMEOUT)
|
||||
.build()
|
||||
.await
|
||||
.map_err(CredentialsError::unhandled)?;
|
||||
ImdsCredentialsProvider::builder()
|
||||
.imds_client(imds_client)
|
||||
.build()
|
||||
.provide_credentials()
|
||||
.await
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
|
||||
let endpoint = Endpoint::immutable(
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
use crate::sock_split::{BidiStream, ReadStream, WriteStream};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket};
|
||||
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
@@ -361,9 +361,11 @@ impl PostgresBackend {
|
||||
match self.auth_type {
|
||||
AuthType::Trust => {
|
||||
self.write_message_noflush(&BeMessage::AuthenticationOk)?
|
||||
.write_message_noflush(&BeMessage::CLIENT_ENCODING)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?
|
||||
// The async python driver requires a valid server_version
|
||||
.write_message_noflush(&BeMessage::server_version("14.1"))?
|
||||
.write_message_noflush(&BeMessage::ParameterStatus(
|
||||
BeParameterStatusMessage::ServerVersion("14.1"),
|
||||
))?
|
||||
.write_message(&BeMessage::ReadyForQuery)?;
|
||||
self.state = ProtoState::Established;
|
||||
}
|
||||
@@ -411,7 +413,7 @@ impl PostgresBackend {
|
||||
}
|
||||
}
|
||||
self.write_message_noflush(&BeMessage::AuthenticationOk)?
|
||||
.write_message_noflush(&BeMessage::CLIENT_ENCODING)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?
|
||||
.write_message(&BeMessage::ReadyForQuery)?;
|
||||
self.state = ProtoState::Established;
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
use crate::postgres_backend::AuthType;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket};
|
||||
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
|
||||
use rand::Rng;
|
||||
use std::future::Future;
|
||||
use std::net::SocketAddr;
|
||||
@@ -331,9 +331,11 @@ impl PostgresBackend {
|
||||
match self.auth_type {
|
||||
AuthType::Trust => {
|
||||
self.write_message(&BeMessage::AuthenticationOk)?
|
||||
.write_message(&BeMessage::CLIENT_ENCODING)?
|
||||
.write_message(&BeParameterStatusMessage::encoding())?
|
||||
// The async python driver requires a valid server_version
|
||||
.write_message(&BeMessage::server_version("14.1"))?
|
||||
.write_message(&BeMessage::ParameterStatus(
|
||||
BeParameterStatusMessage::ServerVersion("14.1"),
|
||||
))?
|
||||
.write_message(&BeMessage::ReadyForQuery)?;
|
||||
self.state = ProtoState::Established;
|
||||
}
|
||||
@@ -382,7 +384,7 @@ impl PostgresBackend {
|
||||
}
|
||||
}
|
||||
self.write_message(&BeMessage::AuthenticationOk)?
|
||||
.write_message(&BeMessage::CLIENT_ENCODING)?
|
||||
.write_message(&BeParameterStatusMessage::encoding())?
|
||||
.write_message(&BeMessage::ReadyForQuery)?;
|
||||
self.state = ProtoState::Established;
|
||||
}
|
||||
|
||||
@@ -11,13 +11,11 @@ use tokio::time::timeout;
|
||||
|
||||
/// An error happened while waiting for a number
|
||||
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
|
||||
#[error("SeqWaitError")]
|
||||
pub enum SeqWaitError {
|
||||
/// The wait timeout was reached
|
||||
#[error("seqwait timeout was reached")]
|
||||
Timeout,
|
||||
|
||||
/// [`SeqWait::shutdown`] was called
|
||||
#[error("SeqWait::shutdown was called")]
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
|
||||
@@ -36,9 +36,9 @@ nix = "0.25"
|
||||
num-traits = "0.2.15"
|
||||
once_cell = "1.13.0"
|
||||
pin-project-lite = "0.2.7"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
|
||||
rand = "0.8.3"
|
||||
regex = "1.4.5"
|
||||
@@ -52,7 +52,7 @@ svg_fmt = "0.4.1"
|
||||
tar = "0.4.33"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
tokio-util = { version = "0.7.3", features = ["io", "io-util"] }
|
||||
toml_edit = { version = "0.14", features = ["easy"] }
|
||||
tracing = "0.1.36"
|
||||
|
||||
@@ -201,12 +201,8 @@ fn initialize_config(
|
||||
}
|
||||
|
||||
fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
// Initialize logging
|
||||
logging::init(conf.log_format)?;
|
||||
|
||||
// Print version to the log, and expose it as a prometheus metric too.
|
||||
info!("version: {}", version());
|
||||
set_build_info_metric(GIT_VERSION);
|
||||
|
||||
// If any failpoints were set from FAILPOINTS environment variable,
|
||||
// print them to the log for debugging purposes
|
||||
@@ -222,37 +218,38 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
)
|
||||
}
|
||||
|
||||
// Create and lock PID file. This ensures that there cannot be more than one
|
||||
// pageserver process running at the same time.
|
||||
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
|
||||
let lock_file =
|
||||
utils::pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
|
||||
info!("Claimed pid file at {lock_file_path:?}");
|
||||
|
||||
// Ensure that the lock file is held even if the main thread of the process panics.
|
||||
// We need to release the lock file only when the process exits.
|
||||
// ensure that the lock file is held even if the main thread of the process is panics
|
||||
// we need to release the lock file only when the current process is gone
|
||||
std::mem::forget(lock_file);
|
||||
|
||||
// Bind the HTTP and libpq ports early, so that if they are in use by some other
|
||||
// process, we error out early.
|
||||
let http_addr = &conf.listen_http_addr;
|
||||
info!("Starting pageserver http handler on {http_addr}");
|
||||
let http_listener = tcp_listener::bind(http_addr)?;
|
||||
// TODO: Check that it looks like a valid repository before going further
|
||||
|
||||
let pg_addr = &conf.listen_pg_addr;
|
||||
info!("Starting pageserver pg protocol handler on {pg_addr}");
|
||||
let pageserver_listener = tcp_listener::bind(pg_addr)?;
|
||||
// bind sockets before daemonizing so we report errors early and do not return until we are listening
|
||||
info!(
|
||||
"Starting pageserver http handler on {}",
|
||||
conf.listen_http_addr
|
||||
);
|
||||
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone())?;
|
||||
|
||||
info!(
|
||||
"Starting pageserver pg protocol handler on {}",
|
||||
conf.listen_pg_addr
|
||||
);
|
||||
let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?;
|
||||
|
||||
// Install signal handlers
|
||||
let signals = signals::install_shutdown_handlers()?;
|
||||
|
||||
// Start profiler (if enabled)
|
||||
// start profiler (if enabled)
|
||||
let profiler_guard = profiling::init_profiler(conf);
|
||||
|
||||
// Launch broker client
|
||||
WALRECEIVER_RUNTIME.block_on(pageserver::walreceiver::init_broker_client(conf))?;
|
||||
|
||||
// Initialize authentication for incoming connections
|
||||
// initialize authentication for incoming connections
|
||||
let auth = match &conf.auth_type {
|
||||
AuthType::Trust | AuthType::MD5 => None,
|
||||
AuthType::NeonJWT => {
|
||||
@@ -280,7 +277,6 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// Set up remote storage client
|
||||
let remote_storage = conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
@@ -288,18 +284,30 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
.transpose()
|
||||
.context("Failed to init generic remote storage")?;
|
||||
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
BACKGROUND_RUNTIME.block_on(tenant_mgr::init_tenant_mgr(conf, remote_storage.clone()))?;
|
||||
let (init_result_sender, init_result_receiver) =
|
||||
std::sync::mpsc::channel::<anyhow::Result<()>>();
|
||||
let storage_for_spawn = remote_storage.clone();
|
||||
let _handler = BACKGROUND_RUNTIME.spawn(async move {
|
||||
let result = tenant_mgr::init_tenant_mgr(conf, storage_for_spawn).await;
|
||||
init_result_sender.send(result)
|
||||
});
|
||||
match init_result_receiver.recv() {
|
||||
Ok(init_result) => init_result.context("Failed to init tenant_mgr")?,
|
||||
Err(_sender_dropped_err) => {
|
||||
anyhow::bail!("Failed to init tenant_mgr: no init status was returned");
|
||||
}
|
||||
}
|
||||
|
||||
// Start up the service to handle HTTP mgmt API request. We created the
|
||||
// listener earlier already.
|
||||
// Spawn all HTTP related tasks in the MGMT_REQUEST_RUNTIME.
|
||||
// bind before launching separate thread so the error reported before startup exits
|
||||
|
||||
// Create a Service from the router above to handle incoming requests.
|
||||
{
|
||||
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
|
||||
|
||||
let router = http::make_router(conf, auth.clone(), remote_storage)?
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let service = utils::http::RouterService::new(router).unwrap();
|
||||
let router = http::make_router(conf, auth.clone(), remote_storage)?;
|
||||
let service =
|
||||
utils::http::RouterService::new(router.build().map_err(|err| anyhow!(err))?).unwrap();
|
||||
let server = hyper::Server::from_tcp(http_listener)?
|
||||
.serve(service)
|
||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
||||
@@ -319,7 +327,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
// Spawn a task to listen for libpq connections. It will spawn further tasks
|
||||
// for each connection. We created the listener earlier already.
|
||||
// for each connection.
|
||||
task_mgr::spawn(
|
||||
COMPUTE_REQUEST_RUNTIME.handle(),
|
||||
TaskKind::LibpqEndpointListener,
|
||||
@@ -332,6 +340,8 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
},
|
||||
);
|
||||
|
||||
set_build_info_metric(GIT_VERSION);
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
signals.handle(|signal| match signal {
|
||||
Signal::Quit => {
|
||||
|
||||
@@ -137,7 +137,6 @@ pub struct PageServerConf {
|
||||
|
||||
/// Storage broker endpoints to connect to.
|
||||
pub broker_endpoint: Uri,
|
||||
pub broker_keepalive_interval: Duration,
|
||||
|
||||
pub log_format: LogFormat,
|
||||
|
||||
@@ -216,7 +215,6 @@ struct PageServerConfigBuilder {
|
||||
|
||||
profiling: BuilderValue<ProfilingConfig>,
|
||||
broker_endpoint: BuilderValue<Uri>,
|
||||
broker_keepalive_interval: BuilderValue<Duration>,
|
||||
|
||||
log_format: BuilderValue<LogFormat>,
|
||||
|
||||
@@ -249,10 +247,6 @@ impl Default for PageServerConfigBuilder {
|
||||
broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint")),
|
||||
broker_keepalive_interval: Set(humantime::parse_duration(
|
||||
storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default keepalive interval")),
|
||||
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
|
||||
|
||||
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
|
||||
@@ -316,10 +310,6 @@ impl PageServerConfigBuilder {
|
||||
self.broker_endpoint = BuilderValue::Set(broker_endpoint)
|
||||
}
|
||||
|
||||
pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) {
|
||||
self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
|
||||
}
|
||||
|
||||
pub fn id(&mut self, node_id: NodeId) {
|
||||
self.id = BuilderValue::Set(node_id)
|
||||
}
|
||||
@@ -375,9 +365,6 @@ impl PageServerConfigBuilder {
|
||||
broker_endpoint: self
|
||||
.broker_endpoint
|
||||
.ok_or(anyhow!("No broker endpoints provided"))?,
|
||||
broker_keepalive_interval: self
|
||||
.broker_keepalive_interval
|
||||
.ok_or(anyhow!("No broker keepalive interval provided"))?,
|
||||
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
|
||||
concurrent_tenant_size_logical_size_queries: self
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -545,7 +532,6 @@ impl PageServerConf {
|
||||
"id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
|
||||
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
|
||||
"broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
|
||||
"broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
|
||||
"log_format" => builder.log_format(
|
||||
LogFormat::from_config(&parse_toml_string(key, item)?)?
|
||||
),
|
||||
@@ -673,7 +659,6 @@ impl PageServerConf {
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::dummy_conf(),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
|
||||
broker_keepalive_interval: Duration::from_secs(5000),
|
||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
}
|
||||
@@ -844,9 +829,6 @@ log_format = 'json'
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
|
||||
broker_keepalive_interval: humantime::parse_duration(
|
||||
storage_broker::DEFAULT_KEEPALIVE_INTERVAL
|
||||
)?,
|
||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
},
|
||||
@@ -890,7 +872,6 @@ log_format = 'json'
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
log_format: LogFormat::Json,
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
},
|
||||
|
||||
@@ -795,6 +795,37 @@ components:
|
||||
latest_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
# These 'local' and 'remote' fields just duplicate some of the fields
|
||||
# above. They are kept for backwards-compatibility. They can be removed,
|
||||
# when the control plane has been updated to look at the above fields
|
||||
# directly.
|
||||
local:
|
||||
$ref: "#/components/schemas/LocalTimelineInfo"
|
||||
remote:
|
||||
$ref: "#/components/schemas/RemoteTimelineInfo"
|
||||
|
||||
LocalTimelineInfo:
|
||||
type: object
|
||||
properties:
|
||||
ancestor_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
ancestor_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
current_logical_size:
|
||||
type: integer
|
||||
current_physical_size:
|
||||
type: integer
|
||||
RemoteTimelineInfo:
|
||||
type: object
|
||||
required:
|
||||
- remote_consistent_lsn
|
||||
properties:
|
||||
remote_consistent_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
Error:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@@ -7,8 +7,8 @@ use remote_storage::GenericRemoteStorage;
|
||||
use tracing::*;
|
||||
|
||||
use super::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
LocalTimelineInfo, RemoteTimelineInfo, StatusResponse, TenantConfigRequest,
|
||||
TenantCreateRequest, TenantCreateResponse, TenantInfo, TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::tenant::Timeline;
|
||||
@@ -147,6 +147,18 @@ fn build_timeline_info_common(timeline: &Arc<Timeline>) -> anyhow::Result<Timeli
|
||||
pg_version: timeline.pg_version,
|
||||
|
||||
state,
|
||||
|
||||
// Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility
|
||||
// with the control plane.
|
||||
local: LocalTimelineInfo {
|
||||
ancestor_timeline_id,
|
||||
ancestor_lsn,
|
||||
current_logical_size,
|
||||
current_physical_size,
|
||||
},
|
||||
remote: RemoteTimelineInfo {
|
||||
remote_consistent_lsn: Some(remote_consistent_lsn),
|
||||
},
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
//! the current task has been requested to shut down. You can use that with
|
||||
//! Tokio select!().
|
||||
//!
|
||||
//!
|
||||
//! TODO: This would be a good place to also handle panics in a somewhat sane way.
|
||||
//! Depending on what task panics, we might want to kill the whole server, or
|
||||
//! only a single tenant or timeline.
|
||||
@@ -42,9 +43,9 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures::FutureExt;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::task_local;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
@@ -145,10 +146,11 @@ static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
|
||||
Lazy::new(|| Mutex::new(HashMap::new()));
|
||||
|
||||
task_local! {
|
||||
// This is a cancellation token which will be cancelled when a task needs to shut down. The
|
||||
// root token is kept in the global registry, so that anyone can send the signal to request
|
||||
// task shutdown.
|
||||
static SHUTDOWN_TOKEN: CancellationToken;
|
||||
// There is a Tokio watch channel for each task, which can be used to signal the
|
||||
// task that it needs to shut down. This task local variable holds the receiving
|
||||
// end of the channel. The sender is kept in the global registry, so that anyone
|
||||
// can send the signal to request task shutdown.
|
||||
static SHUTDOWN_RX: watch::Receiver<bool>;
|
||||
|
||||
// Each task holds reference to its own PageServerTask here.
|
||||
static CURRENT_TASK: Arc<PageServerTask>;
|
||||
@@ -224,8 +226,8 @@ struct PageServerTask {
|
||||
|
||||
name: String,
|
||||
|
||||
// To request task shutdown, just cancel this token.
|
||||
cancel: CancellationToken,
|
||||
// To request task shutdown, send 'true' to the channel to notify the task.
|
||||
shutdown_tx: watch::Sender<bool>,
|
||||
|
||||
mutable: Mutex<MutableTaskState>,
|
||||
}
|
||||
@@ -245,13 +247,13 @@ pub fn spawn<F>(
|
||||
where
|
||||
F: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
{
|
||||
let cancel = CancellationToken::new();
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
|
||||
let task = Arc::new(PageServerTask {
|
||||
task_id: PageserverTaskId(task_id),
|
||||
kind,
|
||||
name: name.to_string(),
|
||||
cancel: cancel.clone(),
|
||||
shutdown_tx,
|
||||
mutable: Mutex::new(MutableTaskState {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
@@ -269,7 +271,7 @@ where
|
||||
task_name,
|
||||
task_id,
|
||||
task_cloned,
|
||||
cancel,
|
||||
shutdown_rx,
|
||||
shutdown_process_on_error,
|
||||
future,
|
||||
));
|
||||
@@ -286,7 +288,7 @@ async fn task_wrapper<F>(
|
||||
task_name: String,
|
||||
task_id: u64,
|
||||
task: Arc<PageServerTask>,
|
||||
shutdown_token: CancellationToken,
|
||||
shutdown_rx: watch::Receiver<bool>,
|
||||
shutdown_process_on_error: bool,
|
||||
future: F,
|
||||
) where
|
||||
@@ -294,9 +296,9 @@ async fn task_wrapper<F>(
|
||||
{
|
||||
debug!("Starting task '{}'", task_name);
|
||||
|
||||
let result = SHUTDOWN_TOKEN
|
||||
let result = SHUTDOWN_RX
|
||||
.scope(
|
||||
shutdown_token,
|
||||
shutdown_rx,
|
||||
CURRENT_TASK.scope(task, {
|
||||
// We use AssertUnwindSafe here so that the payload function
|
||||
// doesn't need to be UnwindSafe. We don't do anything after the
|
||||
@@ -406,7 +408,7 @@ pub async fn shutdown_tasks(
|
||||
&& (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
|
||||
&& (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
|
||||
{
|
||||
task.cancel.cancel();
|
||||
let _ = task.shutdown_tx.send_replace(true);
|
||||
victim_tasks.push(Arc::clone(task));
|
||||
}
|
||||
}
|
||||
@@ -437,28 +439,21 @@ pub fn current_task_kind() -> Option<TaskKind> {
|
||||
/// A Future that can be used to check if the current task has been requested to
|
||||
/// shut down.
|
||||
pub async fn shutdown_watcher() {
|
||||
let token = SHUTDOWN_TOKEN
|
||||
.try_with(|t| t.clone())
|
||||
let mut shutdown_rx = SHUTDOWN_RX
|
||||
.try_with(|rx| rx.clone())
|
||||
.expect("shutdown_requested() called in an unexpected task or thread");
|
||||
|
||||
token.cancelled().await;
|
||||
}
|
||||
|
||||
/// Clone the current task's cancellation token, which can be moved across tasks.
|
||||
///
|
||||
/// When the task which is currently executing is shutdown, the cancellation token will be
|
||||
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
|
||||
/// `tokio::task::JoinSet::spawn`.
|
||||
pub fn shutdown_token() -> CancellationToken {
|
||||
SHUTDOWN_TOKEN
|
||||
.try_with(|t| t.clone())
|
||||
.expect("shutdown_token() called in an unexpected task or thread")
|
||||
while !*shutdown_rx.borrow() {
|
||||
if shutdown_rx.changed().await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Has the current task been requested to shut down?
|
||||
pub fn is_shutdown_requested() -> bool {
|
||||
if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) {
|
||||
cancel.is_cancelled()
|
||||
if let Ok(shutdown_rx) = SHUTDOWN_RX.try_with(|rx| rx.clone()) {
|
||||
*shutdown_rx.borrow()
|
||||
} else {
|
||||
if !cfg!(test) {
|
||||
warn!("is_shutdown_requested() called in an unexpected task or thread");
|
||||
|
||||
@@ -1164,6 +1164,7 @@ impl Tenant {
|
||||
target_timeline_id: Option<TimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
@@ -1178,7 +1179,7 @@ impl Tenant {
|
||||
let _timer = STORAGE_TIME
|
||||
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
|
||||
.start_timer();
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr)
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -1777,6 +1778,7 @@ impl Tenant {
|
||||
target_timeline_id: Option<TimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
let mut totals: GcResult = Default::default();
|
||||
let now = Instant::now();
|
||||
@@ -1803,6 +1805,18 @@ impl Tenant {
|
||||
// made.
|
||||
break;
|
||||
}
|
||||
|
||||
// If requested, force flush all in-memory layers to disk first,
|
||||
// so that they too can be garbage collected. That's
|
||||
// used in tests, so we want as deterministic results as possible.
|
||||
if checkpoint_before_gc {
|
||||
timeline.checkpoint(CheckpointConfig::Forced).await?;
|
||||
info!(
|
||||
"timeline {} checkpoint_before_gc done",
|
||||
timeline.timeline_id
|
||||
);
|
||||
}
|
||||
|
||||
let result = timeline.gc().await?;
|
||||
totals += result;
|
||||
}
|
||||
@@ -2863,7 +2877,7 @@ mod tests {
|
||||
// and compaction works. But it does set the 'cutoff' point so that the cross check
|
||||
// below should fail.
|
||||
tenant
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
|
||||
.await?;
|
||||
|
||||
// try to branch at lsn 25, should fail because we already garbage collected the data
|
||||
@@ -2919,7 +2933,7 @@ mod tests {
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
|
||||
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
|
||||
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
|
||||
match tline.get(*TEST_KEY, Lsn(0x25)) {
|
||||
@@ -2946,7 +2960,7 @@ mod tests {
|
||||
.expect("Should have a local timeline");
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
tenant
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
|
||||
.await?;
|
||||
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
|
||||
|
||||
@@ -2971,7 +2985,7 @@ mod tests {
|
||||
|
||||
// run gc on parent
|
||||
tenant
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
|
||||
.await?;
|
||||
|
||||
// Check that the data is still accessible on the branch.
|
||||
|
||||
@@ -2487,6 +2487,9 @@ impl Timeline {
|
||||
);
|
||||
write_guard.store_and_unlock(new_gc_cutoff).wait();
|
||||
}
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
|
||||
|
||||
info!("GC starting");
|
||||
|
||||
@@ -2597,33 +2600,19 @@ impl Timeline {
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
}
|
||||
|
||||
if !layers_to_remove.is_empty() {
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
|
||||
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
|
||||
for doomed_layer in layers_to_remove {
|
||||
let path = doomed_layer.local_path();
|
||||
self.metrics
|
||||
.current_physical_size_gauge
|
||||
.sub(path.metadata()?.len());
|
||||
layer_names_to_delete.push(doomed_layer.filename());
|
||||
doomed_layer.delete()?;
|
||||
layers.remove_historic(doomed_layer);
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
|
||||
if result.layers_removed != 0 {
|
||||
fail_point!("after-timeline-gc-removed-layers");
|
||||
}
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
|
||||
for doomed_layer in layers_to_remove {
|
||||
let path = doomed_layer.local_path();
|
||||
self.metrics
|
||||
.current_physical_size_gauge
|
||||
.sub(path.metadata()?.len());
|
||||
layer_names_to_delete.push(doomed_layer.filename());
|
||||
doomed_layer.delete()?;
|
||||
layers.remove_historic(doomed_layer);
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -2631,6 +2620,14 @@ impl Timeline {
|
||||
result.layers_removed, new_gc_cutoff
|
||||
);
|
||||
|
||||
if result.layers_removed != 0 {
|
||||
fail_point!("after-timeline-gc-removed-layers");
|
||||
}
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
|
||||
result.elapsed = now.elapsed()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
@@ -496,7 +496,7 @@ pub async fn immediate_gc(
|
||||
async move {
|
||||
fail::fail_point!("immediate_gc_task_pre");
|
||||
let result = tenant
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr)
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)
|
||||
.instrument(info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id))
|
||||
.await;
|
||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||
|
||||
@@ -127,7 +127,7 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
} else {
|
||||
// Run gc
|
||||
if gc_horizon > 0 {
|
||||
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval()).await
|
||||
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false).await
|
||||
{
|
||||
sleep_duration = wait_duration;
|
||||
error!("Gc failed, retrying in {:?}: {e:?}", sleep_duration);
|
||||
|
||||
@@ -44,13 +44,10 @@ pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result
|
||||
let broker_endpoint = conf.broker_endpoint.clone();
|
||||
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
let broker_client =
|
||||
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
|
||||
format!(
|
||||
"Failed to create broker client to {}",
|
||||
&conf.broker_endpoint
|
||||
),
|
||||
)?;
|
||||
let broker_client = storage_broker::connect(broker_endpoint.clone()).context(format!(
|
||||
"Failed to create broker client to {}",
|
||||
&conf.broker_endpoint
|
||||
))?;
|
||||
|
||||
if BROKER_CLIENT.set(broker_client).is_err() {
|
||||
panic!("broker already initialized");
|
||||
@@ -129,21 +126,15 @@ impl<E: Clone> TaskHandle<E> {
|
||||
match self.events_receiver.changed().await {
|
||||
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
|
||||
Err(_task_channel_part_dropped) => {
|
||||
TaskEvent::End(match self.join_handle.as_mut() {
|
||||
TaskEvent::End(match self.join_handle.take() {
|
||||
Some(jh) => {
|
||||
if !jh.is_finished() {
|
||||
warn!("sender is dropped while join handle is still alive");
|
||||
}
|
||||
|
||||
let res = jh
|
||||
.await
|
||||
jh.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to join task: {e}"))
|
||||
.and_then(|x| x);
|
||||
|
||||
// For cancellation-safety, drop join_handle only after successful .await.
|
||||
self.join_handle = None;
|
||||
|
||||
res
|
||||
.and_then(|x| x)
|
||||
}
|
||||
None => {
|
||||
// Another option is to have an enum, join handle or result and give away the reference to it
|
||||
|
||||
@@ -1669,7 +1669,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
* (leaving holes). But this rule is violated in PG-15 where CreateAndCopyRelationData
|
||||
* call smgrextend for destination relation n using size of source relation
|
||||
*/
|
||||
n_blocks = neon_nblocks(reln, forkNum);
|
||||
get_cached_relsize(reln->smgr_rnode.node, forkNum, &n_blocks);
|
||||
while (n_blocks < blkno)
|
||||
neon_wallog_page(reln, forkNum, n_blocks++, buffer, true);
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ sha2 = "0.10.2"
|
||||
socket2 = "0.4.4"
|
||||
thiserror = "1.0.30"
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
tokio-rustls = "0.23.0"
|
||||
tracing = "0.1.36"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::{AuthSuccess, NodeInfo};
|
||||
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
|
||||
use pq_proto::BeMessage as Be;
|
||||
use pq_proto::{BeMessage as Be, BeParameterStatusMessage};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, info_span};
|
||||
@@ -60,7 +60,7 @@ pub async fn handle_user(
|
||||
info!(parent: &span, "sending the auth URL to the user");
|
||||
client
|
||||
.write_message_noflush(&Be::AuthenticationOk)?
|
||||
.write_message_noflush(&Be::CLIENT_ENCODING)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?
|
||||
.write_message(&Be::NoticeResponse(&greeting))
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -8,17 +8,18 @@ use tokio::net::TcpStream;
|
||||
use tokio_postgres::NoTls;
|
||||
use tracing::{error, info};
|
||||
|
||||
const COULD_NOT_CONNECT: &str = "Could not connect to compute node";
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ConnectionError {
|
||||
/// This error doesn't seem to reveal any secrets; for instance,
|
||||
/// [`tokio_postgres::error::Kind`] doesn't contain ip addresses and such.
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
#[error("Failed to connect to the compute node: {0}")]
|
||||
Postgres(#[from] tokio_postgres::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
CouldNotConnect(#[from] io::Error),
|
||||
#[error("Failed to connect to the compute node")]
|
||||
FailedToConnectToCompute,
|
||||
|
||||
#[error("Failed to fetch compute node version")]
|
||||
FailedToFetchPgVersion,
|
||||
}
|
||||
|
||||
impl UserFacingError for ConnectionError {
|
||||
@@ -28,10 +29,10 @@ impl UserFacingError for ConnectionError {
|
||||
// This helps us drop irrelevant library-specific prefixes.
|
||||
// TODO: propagate severity level and other parameters.
|
||||
Postgres(err) => match err.as_db_error() {
|
||||
Some(err) => err.message().to_owned(),
|
||||
Some(err) => err.message().to_string(),
|
||||
None => err.to_string(),
|
||||
},
|
||||
_ => COULD_NOT_CONNECT.to_owned(),
|
||||
other => other.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -48,7 +49,7 @@ pub struct ConnCfg(pub tokio_postgres::Config);
|
||||
impl ConnCfg {
|
||||
/// Construct a new connection config.
|
||||
pub fn new() -> Self {
|
||||
Self(Default::default())
|
||||
Self(tokio_postgres::Config::new())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,7 +95,7 @@ impl ConnCfg {
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"couldn't connect: bad compute config, \
|
||||
ports and hosts entries' count does not match: {:?}",
|
||||
ports and hosts entries' count does not match: {:?}",
|
||||
self.0
|
||||
),
|
||||
));
|
||||
@@ -130,8 +131,8 @@ impl ConnCfg {
|
||||
pub struct PostgresConnection {
|
||||
/// Socket connected to a compute node.
|
||||
pub stream: TcpStream,
|
||||
/// PostgreSQL connection parameters.
|
||||
pub params: std::collections::HashMap<String, String>,
|
||||
/// PostgreSQL version of this instance.
|
||||
pub version: String,
|
||||
}
|
||||
|
||||
impl ConnCfg {
|
||||
@@ -155,7 +156,6 @@ impl ConnCfg {
|
||||
self.0.application_name(app_name);
|
||||
}
|
||||
|
||||
// TODO: This is especially ugly...
|
||||
if let Some(replication) = params.get("replication") {
|
||||
use tokio_postgres::config::ReplicationMode;
|
||||
match replication {
|
||||
@@ -172,24 +172,22 @@ impl ConnCfg {
|
||||
// TODO: extend the list of the forwarded startup parameters.
|
||||
// Currently, tokio-postgres doesn't allow us to pass
|
||||
// arbitrary parameters, but the ones above are a good start.
|
||||
//
|
||||
// This and the reverse params problem can be better addressed
|
||||
// in a bespoke connection machinery (a new library for that sake).
|
||||
|
||||
// TODO: establish a secure connection to the DB.
|
||||
let (socket_addr, mut stream) = self.connect_raw().await?;
|
||||
let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?;
|
||||
let (socket_addr, mut stream) = self
|
||||
.connect_raw()
|
||||
.await
|
||||
.map_err(|_| ConnectionError::FailedToConnectToCompute)?;
|
||||
|
||||
// TODO: establish a secure connection to the DB
|
||||
let (client, conn) = self.0.connect_raw(&mut stream, NoTls).await?;
|
||||
let version = conn
|
||||
.parameter("server_version")
|
||||
.ok_or(ConnectionError::FailedToFetchPgVersion)?
|
||||
.into();
|
||||
|
||||
info!("connected to user's compute node at {socket_addr}");
|
||||
|
||||
// This is very ugly but as of now there's no better way to
|
||||
// extract the connection parameters from tokio-postgres' connection.
|
||||
// TODO: solve this problem in a more elegant manner (e.g. the new library).
|
||||
let params = connection.parameters;
|
||||
|
||||
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
|
||||
// Yet another reason to rework the connection establishing code.
|
||||
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
|
||||
let db = PostgresConnection { stream, params };
|
||||
let db = PostgresConnection { stream, version };
|
||||
|
||||
Ok((db, cancel_closure))
|
||||
}
|
||||
|
||||
@@ -255,21 +255,15 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
// Note that we do this only (for the most part) after we've connected
|
||||
// to a compute (see above) which performs its own authentication.
|
||||
if !auth_result.reported_auth_ok {
|
||||
stream.write_message_noflush(&Be::AuthenticationOk)?;
|
||||
}
|
||||
|
||||
// Forward all postgres connection params to the client.
|
||||
// Right now the implementation is very hacky and inefficent (ideally,
|
||||
// we don't need an intermediate hashmap), but at least it should be correct.
|
||||
for (name, value) in &db.params {
|
||||
// TODO: Theoretically, this could result in a big pile of params...
|
||||
stream.write_message_noflush(&Be::ParameterStatus {
|
||||
name: name.as_bytes(),
|
||||
value: value.as_bytes(),
|
||||
})?;
|
||||
stream
|
||||
.write_message_noflush(&Be::AuthenticationOk)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
|
||||
}
|
||||
|
||||
stream
|
||||
.write_message_noflush(&BeMessage::ParameterStatus(
|
||||
BeParameterStatusMessage::ServerVersion(&db.version),
|
||||
))?
|
||||
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
|
||||
.write_message(&BeMessage::ReadyForQuery)
|
||||
.await?;
|
||||
|
||||
@@ -139,7 +139,7 @@ async fn dummy_proxy(
|
||||
|
||||
stream
|
||||
.write_message_noflush(&Be::AuthenticationOk)?
|
||||
.write_message_noflush(&Be::CLIENT_ENCODING)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?
|
||||
.write_message(&BeMessage::ReadyForQuery)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -20,8 +20,8 @@ hyper = "0.14"
|
||||
nix = "0.25"
|
||||
once_cell = "1.13.0"
|
||||
parking_lot = "0.12.1"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
regex = "1.4.5"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
@@ -29,7 +29,7 @@ serde_with = "2.0"
|
||||
signal-hook = "0.3.10"
|
||||
thiserror = "1"
|
||||
tokio = { version = "1.17", features = ["macros", "fs"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
toml_edit = { version = "0.14", features = ["easy"] }
|
||||
tracing = "0.1.27"
|
||||
url = "2.2.2"
|
||||
|
||||
@@ -82,9 +82,6 @@ struct Args {
|
||||
/// established; plaintext otherwise.
|
||||
#[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)]
|
||||
broker_endpoint: Uri,
|
||||
/// Broker keepalive interval.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = storage_broker::DEFAULT_KEEPALIVE_INTERVAL)]
|
||||
broker_keepalive_interval: Duration,
|
||||
/// Peer safekeeper is considered dead after not receiving heartbeats from
|
||||
/// it during this period passed as a human readable duration.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)]
|
||||
@@ -129,6 +126,9 @@ fn main() -> anyhow::Result<()> {
|
||||
logging::init(LogFormat::from_config(&args.log_format)?)?;
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
// Change into the data directory.
|
||||
std::env::set_current_dir(&args.datadir)?;
|
||||
|
||||
// Set or read our ID.
|
||||
let id = set_id(&args.datadir, args.id.map(NodeId))?;
|
||||
if args.init {
|
||||
@@ -142,7 +142,6 @@ fn main() -> anyhow::Result<()> {
|
||||
listen_http_addr: args.listen_http,
|
||||
no_sync: args.no_sync,
|
||||
broker_endpoint: args.broker_endpoint,
|
||||
broker_keepalive_interval: args.broker_keepalive_interval,
|
||||
heartbeat_timeout: args.heartbeat_timeout,
|
||||
remote_storage: args.remote_storage,
|
||||
max_offloader_lag_bytes: args.max_offloader_lag,
|
||||
|
||||
@@ -66,7 +66,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
|
||||
/// Subscribe and fetch all the interesting data from the broker.
|
||||
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
|
||||
let mut client = storage_broker::connect(conf.broker_endpoint)?;
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
|
||||
@@ -51,7 +51,6 @@ pub struct SafeKeeperConf {
|
||||
pub listen_http_addr: String,
|
||||
pub no_sync: bool,
|
||||
pub broker_endpoint: Uri,
|
||||
pub broker_keepalive_interval: Duration,
|
||||
pub heartbeat_timeout: Duration,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub max_offloader_lag_bytes: u64,
|
||||
@@ -84,7 +83,6 @@ impl SafeKeeperConf {
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint"),
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
backup_runtime_threads: None,
|
||||
wal_backup_enabled: true,
|
||||
auth_validation_public_key_path: None,
|
||||
|
||||
@@ -88,7 +88,7 @@ fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(),
|
||||
};
|
||||
|
||||
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
|
||||
@@ -112,7 +112,7 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
|
||||
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(),
|
||||
};
|
||||
let mut counter: u64 = 0;
|
||||
|
||||
@@ -152,7 +152,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
let h = tokio::spawn(progress_reporter(counters.clone()));
|
||||
|
||||
let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap();
|
||||
let c = storage_broker::connect(DEFAULT_ENDPOINT).unwrap();
|
||||
|
||||
for i in 0..args.num_subs {
|
||||
let c = Some(c.clone());
|
||||
|
||||
@@ -39,9 +39,7 @@ use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
|
||||
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
use storage_broker::{
|
||||
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
|
||||
};
|
||||
use storage_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR};
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::logging::{self, LogFormat};
|
||||
use utils::project_git_version;
|
||||
@@ -49,8 +47,8 @@ use utils::sentry_init::{init_sentry, release_name};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
const DEFAULT_CHAN_SIZE: usize = 32;
|
||||
const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
|
||||
const DEFAULT_CHAN_SIZE: usize = 128;
|
||||
const DEFAULT_HTTP2_KEEPALIVE_INTERVAL: &str = "5000ms";
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
|
||||
@@ -58,14 +56,11 @@ struct Args {
|
||||
/// Endpoint to listen on.
|
||||
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
|
||||
listen_addr: SocketAddr,
|
||||
/// Size of the queue to the per timeline subscriber.
|
||||
/// Size of the queue to the subscriber.
|
||||
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
|
||||
timeline_chan_size: usize,
|
||||
/// Size of the queue to the all keys subscriber.
|
||||
#[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
|
||||
all_keys_chan_size: usize,
|
||||
chan_size: usize,
|
||||
/// HTTP/2 keepalive interval.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HTTP2_KEEPALIVE_INTERVAL)]
|
||||
http2_keepalive_interval: Duration,
|
||||
/// Format for logging, either 'plain' or 'json'.
|
||||
#[arg(long, default_value = "plain")]
|
||||
@@ -113,7 +108,7 @@ struct SharedState {
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
pub fn new(all_keys_chan_size: usize) -> Self {
|
||||
pub fn new(chan_size: usize) -> Self {
|
||||
SharedState {
|
||||
next_pub_id: 0,
|
||||
num_pubs: 0,
|
||||
@@ -121,7 +116,7 @@ impl SharedState {
|
||||
num_subs_to_timelines: 0,
|
||||
chans_to_timeline_subs: HashMap::new(),
|
||||
num_subs_to_all: 0,
|
||||
chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
|
||||
chan_to_all_subs: broadcast::channel(chan_size).0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,7 +139,7 @@ impl SharedState {
|
||||
pub fn register_subscriber(
|
||||
&mut self,
|
||||
sub_key: SubscriptionKey,
|
||||
timeline_chan_size: usize,
|
||||
chan_size: usize,
|
||||
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
|
||||
let sub_id = self.next_sub_id;
|
||||
self.next_sub_id += 1;
|
||||
@@ -163,7 +158,7 @@ impl SharedState {
|
||||
self.chans_to_timeline_subs
|
||||
.entry(ttid)
|
||||
.or_insert(ChanToTimelineSub {
|
||||
chan: broadcast::channel(timeline_chan_size).0,
|
||||
chan: broadcast::channel(chan_size).0,
|
||||
num_subscribers: 0,
|
||||
});
|
||||
chan_to_timeline_sub.num_subscribers += 1;
|
||||
@@ -205,7 +200,7 @@ impl SharedState {
|
||||
#[derive(Clone)]
|
||||
struct Registry {
|
||||
shared_state: Arc<RwLock<SharedState>>,
|
||||
timeline_chan_size: usize,
|
||||
chan_size: usize,
|
||||
}
|
||||
|
||||
impl Registry {
|
||||
@@ -237,7 +232,7 @@ impl Registry {
|
||||
let (sub_id, sub_rx) = self
|
||||
.shared_state
|
||||
.write()
|
||||
.register_subscriber(sub_key, self.timeline_chan_size);
|
||||
.register_subscriber(sub_key, self.chan_size);
|
||||
info!(
|
||||
"subscription started id={}, key={:?}, addr={:?}",
|
||||
sub_id, sub_key, remote_addr
|
||||
@@ -374,8 +369,9 @@ impl BrokerService for Broker {
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
missed_msgs += skipped_msg;
|
||||
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
|
||||
warn!("subscription id={}, key={:?}, addr={:?} dropped {} messages, channel is full",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
|
||||
Err(Status::new(Code::Internal, "full channel"))?;
|
||||
missed_msgs = 0;
|
||||
}
|
||||
}
|
||||
@@ -433,8 +429,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
let registry = Registry {
|
||||
shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
|
||||
timeline_chan_size: args.timeline_chan_size,
|
||||
shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))),
|
||||
chan_size: args.chan_size,
|
||||
};
|
||||
let storage_broker_impl = Broker {
|
||||
registry: registry.clone(),
|
||||
@@ -528,7 +524,7 @@ mod tests {
|
||||
async fn test_registry() {
|
||||
let registry = Registry {
|
||||
shared_state: Arc::new(RwLock::new(SharedState::new(16))),
|
||||
timeline_chan_size: 16,
|
||||
chan_size: 16,
|
||||
};
|
||||
|
||||
// subscribe to timeline 2
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use hyper::body::HttpBody;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::transport::{ClientTlsConfig, Endpoint};
|
||||
use tonic::{transport::Channel, Code, Status};
|
||||
@@ -27,8 +26,6 @@ pub use hyper::Uri;
|
||||
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
|
||||
|
||||
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
|
||||
|
||||
// BrokerServiceClient charged with tonic provided Channel transport; helps to
|
||||
// avoid depending on tonic directly in user crates.
|
||||
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
@@ -36,7 +33,7 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
// Create connection object configured to run TLS if schema starts with https://
|
||||
// and plain text otherwise. Connection is lazy, only endpoint sanity is
|
||||
// validated here.
|
||||
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
|
||||
pub fn connect<U>(endpoint: U) -> anyhow::Result<BrokerClientChannel>
|
||||
where
|
||||
U: std::convert::TryInto<Uri>,
|
||||
U::Error: std::error::Error + Send + Sync + 'static,
|
||||
@@ -49,10 +46,6 @@ where
|
||||
let tls = ClientTlsConfig::new();
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
|
||||
}
|
||||
tonic_endpoint = tonic_endpoint
|
||||
.http2_keep_alive_interval(keepalive_interval)
|
||||
.keep_alive_while_idle(true);
|
||||
// keep_alive_timeout is 20s by default on both client and server side
|
||||
let channel = tonic_endpoint.connect_lazy();
|
||||
Ok(BrokerClientChannel::new(channel))
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# Type-related stuff
|
||||
from typing import Callable, ClassVar, Dict, Iterator, Optional
|
||||
from typing import Callable, ClassVar, Iterator, Optional
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
@@ -135,26 +135,23 @@ class PgBenchRunResult:
|
||||
|
||||
@dataclasses.dataclass
|
||||
class PgBenchInitResult:
|
||||
# Taken from https://github.com/postgres/postgres/blob/REL_15_1/src/bin/pgbench/pgbench.c#L5144-L5171
|
||||
EXTRACTORS: ClassVar[Dict[str, re.Pattern]] = { # type: ignore[type-arg]
|
||||
"drop_tables": re.compile(r"drop tables (\d+\.\d+) s"),
|
||||
"create_tables": re.compile(r"create tables (\d+\.\d+) s"),
|
||||
"client_side_generate": re.compile(r"client-side generate (\d+\.\d+) s"),
|
||||
"server_side_generate": re.compile(r"server-side generate (\d+\.\d+) s"),
|
||||
"vacuum": re.compile(r"vacuum (\d+\.\d+) s"),
|
||||
"primary_keys": re.compile(r"primary keys (\d+\.\d+) s"),
|
||||
"foreign_keys": re.compile(r"foreign keys (\d+\.\d+) s"),
|
||||
"total": re.compile(r"done in (\d+\.\d+) s"), # Total time printed by pgbench
|
||||
}
|
||||
REGEX: ClassVar[re.Pattern] = re.compile( # type: ignore[type-arg]
|
||||
r"done in (\d+\.\d+) s "
|
||||
r"\("
|
||||
r"(?:drop tables (\d+\.\d+) s)?(?:, )?"
|
||||
r"(?:create tables (\d+\.\d+) s)?(?:, )?"
|
||||
r"(?:client-side generate (\d+\.\d+) s)?(?:, )?"
|
||||
r"(?:vacuum (\d+\.\d+) s)?(?:, )?"
|
||||
r"(?:primary keys (\d+\.\d+) s)?(?:, )?"
|
||||
r"\)\."
|
||||
)
|
||||
|
||||
total: Optional[float]
|
||||
total: float
|
||||
drop_tables: Optional[float]
|
||||
create_tables: Optional[float]
|
||||
client_side_generate: Optional[float]
|
||||
server_side_generate: Optional[float]
|
||||
vacuum: Optional[float]
|
||||
primary_keys: Optional[float]
|
||||
foreign_keys: Optional[float]
|
||||
duration: float
|
||||
start_timestamp: int
|
||||
end_timestamp: int
|
||||
@@ -167,35 +164,25 @@ class PgBenchInitResult:
|
||||
start_timestamp: int,
|
||||
end_timestamp: int,
|
||||
):
|
||||
# Parses pgbench initialize output
|
||||
# Parses pgbench initialize output for default initialization steps (dtgvp)
|
||||
# Example: done in 5.66 s (drop tables 0.05 s, create tables 0.31 s, client-side generate 2.01 s, vacuum 0.53 s, primary keys 0.38 s).
|
||||
|
||||
last_line = stderr.splitlines()[-1]
|
||||
|
||||
timings: Dict[str, Optional[float]] = {}
|
||||
last_line_items = re.split(r"\(|\)|,", last_line)
|
||||
for item in last_line_items:
|
||||
for key, regex in cls.EXTRACTORS.items():
|
||||
if (m := regex.match(item.strip())) is not None:
|
||||
if key in timings:
|
||||
raise RuntimeError(
|
||||
f"can't store pgbench results for repeated action `{key}`"
|
||||
)
|
||||
|
||||
timings[key] = float(m.group(1))
|
||||
|
||||
if not timings or "total" not in timings:
|
||||
if (m := cls.REGEX.match(last_line)) is not None:
|
||||
total, drop_tables, create_tables, client_side_generate, vacuum, primary_keys = [
|
||||
float(v) for v in m.groups() if v is not None
|
||||
]
|
||||
else:
|
||||
raise RuntimeError(f"can't parse pgbench initialize results from `{last_line}`")
|
||||
|
||||
return cls(
|
||||
total=timings["total"],
|
||||
drop_tables=timings.get("drop_tables", 0.0),
|
||||
create_tables=timings.get("create_tables", 0.0),
|
||||
client_side_generate=timings.get("client_side_generate", 0.0),
|
||||
server_side_generate=timings.get("server_side_generate", 0.0),
|
||||
vacuum=timings.get("vacuum", 0.0),
|
||||
primary_keys=timings.get("primary_keys", 0.0),
|
||||
foreign_keys=timings.get("foreign_keys", 0.0),
|
||||
total=total,
|
||||
drop_tables=drop_tables,
|
||||
create_tables=create_tables,
|
||||
client_side_generate=client_side_generate,
|
||||
vacuum=vacuum,
|
||||
primary_keys=primary_keys,
|
||||
duration=duration,
|
||||
start_timestamp=start_timestamp,
|
||||
end_timestamp=end_timestamp,
|
||||
@@ -339,10 +326,8 @@ class NeonBenchmarker:
|
||||
"drop_tables",
|
||||
"create_tables",
|
||||
"client_side_generate",
|
||||
"server_side_generate",
|
||||
"vacuum",
|
||||
"primary_keys",
|
||||
"foreign_keys",
|
||||
]
|
||||
for metric in metrics:
|
||||
if (value := getattr(result, metric)) is not None:
|
||||
|
||||
@@ -115,7 +115,6 @@ class NeonCompare(PgCompare):
|
||||
return self._pg_bin
|
||||
|
||||
def flush(self):
|
||||
self.pageserver_http_client.timeline_checkpoint(self.env.initial_tenant, self.timeline)
|
||||
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
|
||||
|
||||
def compact(self):
|
||||
|
||||
@@ -3084,34 +3084,3 @@ def fork_at_current_lsn(
|
||||
"""
|
||||
current_lsn = pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
|
||||
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
|
||||
|
||||
|
||||
def wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: List[Safekeeper],
|
||||
pageserver: NeonPageserver,
|
||||
):
|
||||
sk_commit_lsns = [
|
||||
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn for sk in safekeepers
|
||||
]
|
||||
lsn = max(sk_commit_lsns)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, lsn)
|
||||
return lsn
|
||||
|
||||
|
||||
def wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: List[Safekeeper],
|
||||
pageserver: NeonPageserver,
|
||||
):
|
||||
lsn = wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
|
||||
tenant_id, timeline_id, safekeepers, pageserver
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
|
||||
return lsn
|
||||
|
||||
@@ -15,7 +15,7 @@ from fixtures.utils import get_scale_for_db
|
||||
@enum.unique
|
||||
class PgBenchLoadType(enum.Enum):
|
||||
INIT = "init"
|
||||
SIMPLE_UPDATE = "simple-update"
|
||||
SIMPLE_UPDATE = "simple_update"
|
||||
SELECT_ONLY = "select-only"
|
||||
|
||||
|
||||
@@ -94,9 +94,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
|
||||
if workload_type == PgBenchLoadType.INIT:
|
||||
# Run initialize
|
||||
init_pgbench(
|
||||
env, ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr], password=password
|
||||
)
|
||||
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", connstr], password=password)
|
||||
|
||||
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
|
||||
# Run simple-update workload
|
||||
|
||||
@@ -84,7 +84,6 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
|
||||
|
||||
# Set the GC horizon so that lsn1 is inside the horizon, which means
|
||||
# we can create a new branch starting from lsn1.
|
||||
pageserver_http_client.timeline_checkpoint(tenant, timeline_main)
|
||||
pageserver_http_client.timeline_gc(tenant, timeline_main, lsn2 - lsn1 + 1024)
|
||||
|
||||
env.neon_cli.create_branch(
|
||||
@@ -157,7 +156,6 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
|
||||
# branch creation task but the individual timeline GC iteration happens *after*
|
||||
# the branch creation task.
|
||||
pageserver_http_client.configure_failpoints(("before-timeline-gc", "sleep(2000)"))
|
||||
pageserver_http_client.timeline_checkpoint(tenant, b0)
|
||||
|
||||
def do_gc():
|
||||
pageserver_http_client.timeline_gc(tenant, b0, 0)
|
||||
|
||||
@@ -109,7 +109,6 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# check that we cannot create branch based on garbage collected data
|
||||
with env.pageserver.http_client() as pageserver_http:
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
|
||||
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
print_gc_result(gc_result)
|
||||
|
||||
|
||||
@@ -2,17 +2,9 @@ import asyncio
|
||||
import concurrent.futures
|
||||
import random
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
Postgres,
|
||||
RemoteStorageKind,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
|
||||
from fixtures.types import TimelineId
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
# Test configuration
|
||||
@@ -43,13 +35,11 @@ async def gc(env: NeonEnv, timeline: TimelineId):
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def do_gc():
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
|
||||
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
while updates_performed < updates_to_perform:
|
||||
await loop.run_in_executor(pool, do_gc)
|
||||
await loop.run_in_executor(
|
||||
pool, lambda: pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
)
|
||||
|
||||
|
||||
# At the same time, run UPDATEs and GC
|
||||
@@ -97,76 +87,3 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
|
||||
r = cur.fetchone()
|
||||
assert r is not None
|
||||
assert r == (num_rows, updates_to_perform)
|
||||
|
||||
|
||||
#
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
|
||||
|
||||
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
|
||||
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_gc_index_upload",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_gc_index_upload", "main")
|
||||
pg = env.postgres.create_start("test_gc_index_upload")
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
|
||||
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
|
||||
|
||||
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO foo
|
||||
SELECT g, 0, 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
"""
|
||||
)
|
||||
|
||||
# Helper function that gets the number of given kind of remote ops from the metrics
|
||||
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
|
||||
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
|
||||
total = 0.0
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
filter={
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
},
|
||||
):
|
||||
total += sample[2]
|
||||
return int(total)
|
||||
|
||||
# Sanity check that the metric works
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
|
||||
before = get_num_remote_ops("index", "upload")
|
||||
assert before > 0
|
||||
|
||||
# Run many cycles of GC. Then check that the number of index files
|
||||
# uploads didn't grow much. In particular we don't want to re-upload the
|
||||
# index file on every GC iteration, when it has no work to do.
|
||||
#
|
||||
# On each iteration, we use a slightly smaller GC horizon, so that the GC
|
||||
# at least needs to check if it has work to do.
|
||||
for i in range(100):
|
||||
cur.execute("INSERT INTO foo VALUES (0, 0, 'foo')")
|
||||
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000 - i * 32)
|
||||
num_index_uploads = get_num_remote_ops("index", "upload")
|
||||
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")
|
||||
|
||||
after = num_index_uploads
|
||||
log.info(f"{after-before} new index uploads during test")
|
||||
assert after - before < 5
|
||||
|
||||
@@ -306,7 +306,6 @@ def _import(
|
||||
|
||||
# Check that gc works
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pageserver_http.timeline_checkpoint(tenant, timeline)
|
||||
pageserver_http.timeline_gc(tenant, timeline, 0)
|
||||
|
||||
return tar_output_file
|
||||
|
||||
@@ -59,7 +59,6 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
|
||||
# Make a lot of updates on a single row, generating a lot of WAL. Trigger
|
||||
# garbage collections so that the page server will remove old page versions.
|
||||
for i in range(10):
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
|
||||
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
print_gc_result(gc_result)
|
||||
|
||||
|
||||
@@ -52,7 +52,6 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# run GC
|
||||
with env.pageserver.http_client() as pageserver_http:
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
|
||||
pageserver_http.timeline_compact(env.initial_tenant, timeline)
|
||||
# perform aggressive GC. Data still should be kept because of the PITR setting.
|
||||
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
|
||||
@@ -122,33 +122,3 @@ def test_auth_errors(static_proxy: NeonProxy):
|
||||
# Finally, check that the user can connect
|
||||
with static_proxy.connect(user="pinocchio", password="magic", options="project=irrelevant"):
|
||||
pass
|
||||
|
||||
|
||||
def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
# A subset of parameters (GUCs) which postgres
|
||||
# sends to the client during connection setup.
|
||||
# Unfortunately, `GUC_REPORT` can't be queried.
|
||||
# Proxy *should* forward them, otherwise client library
|
||||
# might misbehave (e.g. parse timestamps incorrectly).
|
||||
reported_params_subset = [
|
||||
"client_encoding",
|
||||
"integer_datetimes",
|
||||
"is_superuser",
|
||||
"server_encoding",
|
||||
"server_version",
|
||||
"session_authorization",
|
||||
"standard_conforming_strings",
|
||||
]
|
||||
|
||||
query = """
|
||||
select name, setting
|
||||
from pg_catalog.pg_settings
|
||||
where name = any(%s)
|
||||
"""
|
||||
|
||||
with static_proxy.connect(options="project=irrelevant") as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(query, (reported_params_subset,))
|
||||
for name, value in cur.fetchall():
|
||||
# Check that proxy has forwarded this parameter.
|
||||
assert conn.get_parameter_status(name) == value
|
||||
|
||||
@@ -24,7 +24,6 @@ def do_gc_target(
|
||||
"""Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211"""
|
||||
try:
|
||||
log.info("sending gc http request")
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
pageserver_http.timeline_gc(tenant_id, timeline_id, 0)
|
||||
except Exception as e:
|
||||
log.error("do_gc failed: %s", e)
|
||||
|
||||
@@ -24,7 +24,6 @@ from fixtures.neon_fixtures import (
|
||||
assert_no_in_progress_downloads_for_tenant,
|
||||
available_remote_storages,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
@@ -162,9 +161,16 @@ def test_tenants_attached_after_download(
|
||||
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
|
||||
env.postgres.stop_all()
|
||||
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
tenant_id, timeline_id, env.safekeepers, env.pageserver
|
||||
)
|
||||
sk_commit_lsns = [
|
||||
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn
|
||||
for sk in env.safekeepers
|
||||
]
|
||||
log.info("wait for pageserver to process all the WAL")
|
||||
wait_for_last_record_lsn(client, tenant_id, timeline_id, max(sk_commit_lsns))
|
||||
log.info("wait for it to reach remote storage")
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(client, tenant_id, timeline_id, max(sk_commit_lsns))
|
||||
log.info("latest safekeeper_commit_lsn reached remote storage")
|
||||
|
||||
detail_before = client.timeline_detail(
|
||||
tenant_id, timeline_id, include_non_incremental_physical_size=True
|
||||
|
||||
@@ -326,6 +326,7 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
|
||||
|
||||
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
|
||||
|
||||
assert_physical_size(env, env.initial_tenant, new_timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user