Compare commits

..

1 Commits

Author SHA1 Message Date
Arseny Sher
9fd9a15305 Terminate subscription if channel is full.
Might help as a hotfix, but need to understand root better.
2022-12-15 12:53:59 +04:00
67 changed files with 647 additions and 716 deletions

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-prod-storage-ap-southeast-1
bucket_region: ap-southeast-1
console_mgmt_base_url: http://console-release.local
broker_endpoint: http://storage-broker-lb.epsilon.ap-southeast-1.internal.aws.neon.tech:50051
broker_endpoint: https://storage-broker.epsilon.ap-southeast-1.internal.aws.neon.tech:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-prod-storage-eu-central-1
bucket_region: eu-central-1
console_mgmt_base_url: http://console-release.local
broker_endpoint: http://storage-broker-lb.gamma.eu-central-1.internal.aws.neon.tech:50051
broker_endpoint: https://storage-broker.gamma.eu-central-1.internal.aws.neon.tech:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-prod-storage-us-east-2
bucket_region: us-east-2
console_mgmt_base_url: http://console-release.local
broker_endpoint: http://storage-broker-lb.delta.us-east-2.internal.aws.neon.tech:50051
broker_endpoint: https://storage-broker.delta.us-east-2.internal.aws.neon.tech:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-prod-storage-us-west-2
bucket_region: us-west-2
console_mgmt_base_url: http://console-release.local
broker_endpoint: http://storage-broker-lb.eta.us-west-2.internal.aws.neon.tech:50051
broker_endpoint: https://storage-broker.eta.us-west-2.internal.aws.neon.tech:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-dev-storage-eu-west-1
bucket_region: eu-west-1
console_mgmt_base_url: http://console-staging.local
broker_endpoint: http://storage-broker-lb.zeta.eu-west-1.internal.aws.neon.build:50051
broker_endpoint: https://storage-broker.zeta.eu-west-1.internal.aws.neon.build:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-staging-storage-us-east-2
bucket_region: us-east-2
console_mgmt_base_url: http://console-staging.local
broker_endpoint: http://storage-broker-lb.beta.us-east-2.internal.aws.neon.build:50051
broker_endpoint: https://storage-broker.beta.us-east-2.internal.aws.neon.build:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,22 +3,27 @@ podLabels:
neon_env: staging
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.zeta.eu-west-1.internal.aws.neon.build
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
enabled: true
annotations:
kubernetes.io/ingress.class: nginx-internal
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
- host: storage-broker.zeta.eu-west-1.internal.aws.neon.build
paths:
- path: /
pathType: Prefix
tls:
- hosts:
- storage-broker.zeta.eu-west-1.internal.aws.neon.build
secretName: storage-broker-tls
metrics:
enabled: false

View File

@@ -3,22 +3,27 @@ podLabels:
neon_env: staging
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.beta.us-east-2.internal.aws.neon.build
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
enabled: true
annotations:
kubernetes.io/ingress.class: nginx-internal
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
- host: storage-broker.beta.us-east-2.internal.aws.neon.build
paths:
- path: /
pathType: Prefix
tls:
- hosts:
- storage-broker.beta.us-east-2.internal.aws.neon.build
secretName: storage-broker-tls
metrics:
enabled: false

View File

@@ -3,22 +3,27 @@ podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.epsilon.ap-southeast-1.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
enabled: true
annotations:
kubernetes.io/ingress.class: nginx-internal
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
- host: storage-broker.epsilon.ap-southeast-1.internal.aws.neon.tech
paths:
- path: /
pathType: Prefix
tls:
- hosts:
- storage-broker.epsilon.ap-southeast-1.internal.aws.neon.tech
secretName: storage-broker-tls
metrics:
enabled: false

View File

@@ -3,22 +3,27 @@ podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.gamma.eu-central-1.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
enabled: true
annotations:
kubernetes.io/ingress.class: nginx-internal
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
- host: storage-broker.gamma.eu-central-1.internal.aws.neon.tech
paths:
- path: /
pathType: Prefix
tls:
- hosts:
- storage-broker.gamma.eu-central-1.internal.aws.neon.tech
secretName: storage-broker-tls
metrics:
enabled: false

View File

@@ -3,22 +3,27 @@ podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.delta.us-east-2.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
enabled: true
annotations:
kubernetes.io/ingress.class: nginx-internal
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
- host: storage-broker.delta.us-east-2.internal.aws.neon.tech
paths:
- path: /
pathType: Prefix
tls:
- hosts:
- storage-broker.delta.us-east-2.internal.aws.neon.tech
secretName: storage-broker-tls
metrics:
enabled: false

View File

@@ -3,22 +3,27 @@ podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.eta.us-west-2.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
enabled: true
annotations:
kubernetes.io/ingress.class: nginx-internal
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
# we have basically infinite streams, disable body size limit
nginx.ingress.kubernetes.io/proxy-body-size: "0"
cert-manager.io/cluster-issuer: "cert-manager-clusterissuer"
hosts:
- host: storage-broker.eta.us-west-2.internal.aws.neon.tech
paths:
- path: /
pathType: Prefix
tls:
- hosts:
- storage-broker.eta.us-west-2.internal.aws.neon.tech
secretName: storage-broker-tls
metrics:
enabled: false

View File

@@ -18,7 +18,6 @@ on:
region_id:
description: 'Use a particular region. If not set the default region will be used'
required: false
default: 'aws-us-east-2'
save_perf_report:
type: boolean
description: 'Publish perf report or not. If not set, the report is published only for the main branch'
@@ -116,10 +115,13 @@ jobs:
# neon-captest-prefetch: Same, with prefetching enabled (new project)
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-reuse, neon-captest-prefetch, rds-postgres ]
platform: [ neon-captest-new, neon-captest-reuse, neon-captest-prefetch, rds-postgres ]
db_size: [ 10gb ]
runner: [ us-east-2 ]
include:
- platform: neon-captest-new
db_size: 50gb
runner: us-east-2
- platform: neon-captest-prefetch
db_size: 50gb
runner: us-east-2

View File

@@ -1072,7 +1072,7 @@ jobs:
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
helm upgrade neon-storage-broker neondatabase/neon-storage-broker --namespace neon-storage-broker --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
deploy-proxy-prod-new:
runs-on: prod
@@ -1149,7 +1149,7 @@ jobs:
- name: Deploy storage-broker
run:
helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
helm upgrade neon-storage-broker neondatabase/neon-storage-broker --namespace neon-storage-broker --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ needs.tag.outputs.build-tag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s
promote-compatibility-data:
runs-on: [ self-hosted, dev, x64 ]

53
Cargo.lock generated
View File

@@ -563,12 +563,6 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "bincode"
version = "1.3.3"
@@ -1926,7 +1920,7 @@ version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f4f04699947111ec1733e71778d763555737579e44b85844cae8e1940a1828"
dependencies = [
"base64 0.13.1",
"base64",
"pem",
"ring",
"serde",
@@ -2513,7 +2507,7 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03c64931a1a212348ec4f3b4362585eca7159d0d09cbdf4a7f74f02173596fd4"
dependencies = [
"base64 0.13.1",
"base64",
]
[[package]]
@@ -2534,18 +2528,18 @@ dependencies = [
[[package]]
name = "phf"
version = "0.11.1"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.1"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676"
checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096"
dependencies = [
"siphasher",
]
@@ -2618,12 +2612,12 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
version = "0.19.2"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=d052ee8b86fff9897c77b0fe89ea9daba0e1fa38#d052ee8b86fff9897c77b0fe89ea9daba0e1fa38"
dependencies = [
"bytes",
"fallible-iterator",
"futures-util",
"futures",
"log",
"tokio",
"tokio-postgres",
@@ -2632,9 +2626,9 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=d052ee8b86fff9897c77b0fe89ea9daba0e1fa38#d052ee8b86fff9897c77b0fe89ea9daba0e1fa38"
dependencies = [
"base64 0.20.0",
"base64",
"byteorder",
"bytes",
"fallible-iterator",
@@ -2649,8 +2643,8 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
version = "0.2.3"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=d052ee8b86fff9897c77b0fe89ea9daba0e1fa38#d052ee8b86fff9897c77b0fe89ea9daba0e1fa38"
dependencies = [
"bytes",
"fallible-iterator",
@@ -2874,7 +2868,7 @@ dependencies = [
"anyhow",
"async-trait",
"atty",
"base64 0.13.1",
"base64",
"bstr",
"bytes",
"clap 4.0.29",
@@ -3084,7 +3078,7 @@ version = "0.11.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c"
dependencies = [
"base64 0.13.1",
"base64",
"bytes",
"encoding_rs",
"futures-core",
@@ -3267,7 +3261,7 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
dependencies = [
"base64 0.13.1",
"base64",
]
[[package]]
@@ -3548,7 +3542,7 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25bf4a5a814902cd1014dbccfa4d4560fb8432c779471e96e035602519f82eef"
dependencies = [
"base64 0.13.1",
"base64",
"chrono",
"hex",
"indexmap",
@@ -4015,15 +4009,14 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e"
version = "0.7.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=d052ee8b86fff9897c77b0fe89ea9daba0e1fa38#d052ee8b86fff9897c77b0fe89ea9daba0e1fa38"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"futures",
"log",
"parking_lot 0.12.1",
"percent-encoding",
@@ -4116,7 +4109,7 @@ dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.13.1",
"base64",
"bytes",
"futures-core",
"futures-util",
@@ -4358,7 +4351,7 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97acb4c28a254fd7a4aeec976c46a7fa404eac4d7c134b30c75144846d7cb8f"
dependencies = [
"base64 0.13.1",
"base64",
"chunked_transfer",
"log",
"native-tls",
@@ -4794,7 +4787,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0ecbeb7b67ce215e40e3cc7f2ff902f94a223acf44995934763467e7b1febc8"
dependencies = [
"asn1-rs",
"base64 0.13.1",
"base64",
"data-encoding",
"der-parser",
"lazy_static",

View File

@@ -86,4 +86,4 @@ lto = true
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
[patch.crates-io]
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }

203
Makefile
View File

@@ -61,115 +61,146 @@ all: neon postgres neon-pg-ext
#
# The 'postgres_ffi' depends on the Postgres headers.
.PHONY: neon
neon: postgres-headers
neon: postgres-v14-headers postgres-v15-headers
+@echo "Compiling Neon"
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS)
### PostgreSQL parts
# Some rules are duplicated for Postgres v14 and 15. We may want to refactor
# The rules are duplicated for Postgres v14 and 15. We may want to refactor
# to avoid the duplication in the future, but it's tolerable for now.
#
$(POSTGRES_INSTALL_DIR)/build/%/config.status:
+@echo "Configuring Postgres $* build"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/$*
(cd $(POSTGRES_INSTALL_DIR)/build/$* && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure \
$(POSTGRES_INSTALL_DIR)/build/v14/config.status:
+@echo "Configuring Postgres v14 build"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/v14
(cd $(POSTGRES_INSTALL_DIR)/build/v14 && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-v14/configure \
CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) \
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/$* > configure.log)
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/v14 > configure.log)
$(POSTGRES_INSTALL_DIR)/build/v15/config.status:
+@echo "Configuring Postgres v15 build"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/v15
(cd $(POSTGRES_INSTALL_DIR)/build/v15 && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-v15/configure \
CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) \
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/v15 > configure.log)
# nicer alias to run 'configure'
# Note: I've been unable to use templates for this part of our configuration.
# I'm not sure why it wouldn't work, but this is the only place (apart from
# the "build-all-versions" entry points) where direct mention of PostgreSQL
# versions is used.
.PHONY: postgres-configure-v15
postgres-configure-v15: $(POSTGRES_INSTALL_DIR)/build/v15/config.status
.PHONY: postgres-configure-v14
postgres-configure-v14: $(POSTGRES_INSTALL_DIR)/build/v14/config.status
.PHONY: postgres-v14-configure
postgres-v14-configure: $(POSTGRES_INSTALL_DIR)/build/v14/config.status
.PHONY: postgres-v15-configure
postgres-v15-configure: $(POSTGRES_INSTALL_DIR)/build/v15/config.status
# Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)/<version>/include
.PHONY: postgres-headers-%
postgres-headers-%: postgres-configure-%
+@echo "Installing PostgreSQL $* headers"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/include MAKELEVEL=0 install
.PHONY: postgres-v14-headers
postgres-v14-headers: postgres-v14-configure
+@echo "Installing PostgreSQL v14 headers"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/src/include MAKELEVEL=0 install
.PHONY: postgres-v15-headers
postgres-v15-headers: postgres-v15-configure
+@echo "Installing PostgreSQL v15 headers"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/src/include MAKELEVEL=0 install
# Compile and install PostgreSQL
.PHONY: postgres-%
postgres-%: postgres-configure-% \
postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers`
+@echo "Compiling PostgreSQL $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 install
+@echo "Compiling libpq $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq install
+@echo "Compiling pg_prewarm $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_prewarm install
+@echo "Compiling pg_buffercache $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache install
+@echo "Compiling pageinspect $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
.PHONY: postgres-v14
postgres-v14: postgres-v14-configure \
postgres-v14-headers # to prevent `make install` conflicts with neon's `postgres-headers`
+@echo "Compiling PostgreSQL v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14 MAKELEVEL=0 install
+@echo "Compiling libpq v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/src/interfaces/libpq install
+@echo "Compiling pg_prewarm v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_prewarm install
+@echo "Compiling pg_buffercache v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_buffercache install
+@echo "Compiling pageinspect v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pageinspect install
.PHONY: postgres-clean-%
postgres-clean-%:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq clean
.PHONY: neon-pg-ext-%
neon-pg-ext-%: postgres-%
+@echo "Compiling neon $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install
+@echo "Compiling neon_walredo $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install
+@echo "Compiling neon_test_utils $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install
.PHONY: neon-pg-ext-clean-%
neon-pg-ext-clean-%:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/pgxn/neon-$* -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/pgxn/neon_walredo-$* -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/pgxn/neon_test_utils-$* -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile clean
.PHONY: neon-pg-ext
neon-pg-ext: \
neon-pg-ext-v14 \
neon-pg-ext-v15
.PHONY: neon-pg-ext-clean
neon-pg-ext-clean: \
neon-pg-ext-clean-v14 \
neon-pg-ext-clean-v15
.PHONY: postgres-v15
postgres-v15: postgres-v15-configure \
postgres-v15-headers # to prevent `make install` conflicts with neon's `postgres-headers`
+@echo "Compiling PostgreSQL v15"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15 MAKELEVEL=0 install
+@echo "Compiling libpq v15"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/src/interfaces/libpq install
+@echo "Compiling pg_prewarm v15"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pg_prewarm install
+@echo "Compiling pg_buffercache v15"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pg_buffercache install
+@echo "Compiling pageinspect v15"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pageinspect install
# shorthand to build all Postgres versions
.PHONY: postgres
postgres: \
postgres-v14 \
postgres-v15
postgres: postgres-v14 postgres-v15
.PHONY: postgres-headers
postgres-headers: \
postgres-headers-v14 \
postgres-headers-v15
.PHONY: postgres-v14-clean
postgres-v14-clean:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14 MAKELEVEL=0 clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_buffercache clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pageinspect clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/src/interfaces/libpq clean
.PHONY: postgres-clean
postgres-clean: \
postgres-clean-v14 \
postgres-clean-v15
.PHONY: postgres-v15-clean
postgres-v15-clean:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15 MAKELEVEL=0 clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pg_buffercache clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/contrib/pageinspect clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v15/src/interfaces/libpq clean
neon-pg-ext-v14: postgres-v14
+@echo "Compiling neon v14"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-v14
(cd $(POSTGRES_INSTALL_DIR)/build/neon-v14 && \
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install)
+@echo "Compiling neon_walredo v14"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v14
(cd $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v14 && \
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install)
+@echo "Compiling neon_test_utils" v14
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v14
(cd $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v14 && \
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v14/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install)
neon-pg-ext-v15: postgres-v15
+@echo "Compiling neon v15"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-v15
(cd $(POSTGRES_INSTALL_DIR)/build/neon-v15 && \
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install)
+@echo "Compiling neon_walredo v15"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v15
(cd $(POSTGRES_INSTALL_DIR)/build/neon-walredo-v15 && \
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install)
+@echo "Compiling neon_test_utils" v15
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v15
(cd $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-v15 && \
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v15/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install)
.PHONY: neon-pg-ext-clean
$(MAKE) -C $(ROOT_PROJECT_DIR)/pgxn/neon clean
$(MAKE) -C $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils clean
neon-pg-ext: neon-pg-ext-v14 neon-pg-ext-v15
postgres-headers: postgres-v14-headers postgres-v15-headers
postgres-clean: postgres-v14-clean postgres-v15-clean
# This doesn't remove the effects of 'configure'.
.PHONY: clean
clean: postgres-clean neon-pg-ext-clean
clean:
cd $(POSTGRES_INSTALL_DIR)/build/v14 && $(MAKE) clean
cd $(POSTGRES_INSTALL_DIR)/build/v15 && $(MAKE) clean
$(CARGO_CMD_PREFIX) cargo clean
cd pgxn/neon && $(MAKE) clean
cd pgxn/neon_test_utils && $(MAKE) clean
# This removes everything
.PHONY: distclean

View File

@@ -12,12 +12,12 @@ futures = "0.3.13"
hyper = { version = "0.14", features = ["full"] }
log = { version = "0.4", features = ["std", "serde"] }
notify = "5.0.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
tar = "0.4"
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
url = "2.2.2"
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -10,7 +10,7 @@ comfy-table = "6.1"
git-version = "0.3.5"
nix = "0.25"
once_cell = "1.13.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }

View File

@@ -203,6 +203,29 @@ pub struct TimelineInfo {
pub pg_version: u32,
pub state: TimelineState,
// Some of the above fields are duplicated in 'local' and 'remote', for backwards-
// compatility with older clients.
pub local: LocalTimelineInfo,
pub remote: RemoteTimelineInfo,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LocalTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RemoteTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub remote_consistent_lsn: Option<Lsn>,
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;

View File

@@ -8,8 +8,8 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
itertools = "0.10.3"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
url = "2.2.2"
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -21,7 +21,7 @@ workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[dev-dependencies]
env_logger = "0.9"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
wal_craft = { path = "wal_craft" }
[build-dependencies]

View File

@@ -11,7 +11,7 @@ clap = "4.0"
env_logger = "0.9"
log = "0.4"
once_cell = "1.13.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres_ffi = { path = "../" }
tempfile = "3.2"
workspace_hack = { version = "0.1", path = "../../../workspace_hack" }

View File

@@ -7,7 +7,7 @@ edition = "2021"
anyhow = "1.0"
bytes = "1.0.1"
pin-project-lite = "0.2.7"
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
rand = "0.8.3"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.17", features = ["macros"] }

View File

@@ -463,10 +463,7 @@ pub enum BeMessage<'a> {
EncryptionResponse(bool),
NoData,
ParameterDescription,
ParameterStatus {
name: &'a [u8],
value: &'a [u8],
},
ParameterStatus(BeParameterStatusMessage<'a>),
ParseComplete,
ReadyForQuery,
RowDescription(&'a [RowDescriptor<'a>]),
@@ -475,28 +472,6 @@ pub enum BeMessage<'a> {
KeepAlive(WalSndKeepAlive),
}
/// Common shorthands.
impl<'a> BeMessage<'a> {
/// A [`BeMessage::ParameterStatus`] holding the client encoding, i.e. UTF-8.
/// This is a sensible default, given that:
/// * rust strings only support this encoding out of the box.
/// * tokio-postgres, postgres-jdbc (and probably more) mandate it.
///
/// TODO: do we need to report `server_encoding` as well?
pub const CLIENT_ENCODING: Self = Self::ParameterStatus {
name: b"client_encoding",
value: b"UTF8",
};
/// Build a [`BeMessage::ParameterStatus`] holding the server version.
pub fn server_version(version: &'a str) -> Self {
Self::ParameterStatus {
name: b"server_version",
value: version.as_bytes(),
}
}
}
#[derive(Debug)]
pub enum BeAuthenticationSaslMessage<'a> {
Methods(&'a [&'a str]),
@@ -510,6 +485,12 @@ pub enum BeParameterStatusMessage<'a> {
ServerVersion(&'a str),
}
impl BeParameterStatusMessage<'static> {
pub fn encoding() -> BeMessage<'static> {
BeMessage::ParameterStatus(Self::Encoding("UTF8"))
}
}
// One row description in RowDescription packet.
#[derive(Debug)]
pub struct RowDescriptor<'a> {
@@ -606,15 +587,14 @@ fn write_body<R>(buf: &mut BytesMut, f: impl FnOnce(&mut BytesMut) -> R) -> R {
}
/// Safe write of s into buf as cstring (String in the protocol).
fn write_cstr(s: impl AsRef<[u8]>, buf: &mut BytesMut) -> Result<(), io::Error> {
let bytes = s.as_ref();
if bytes.contains(&0) {
fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
if s.contains(&0) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"string contains embedded null",
));
}
buf.put_slice(bytes);
buf.put_slice(s);
buf.put_u8(0);
Ok(())
}
@@ -664,7 +644,7 @@ impl<'a> BeMessage<'a> {
Methods(methods) => {
buf.put_i32(10); // Specifies that SASL auth method is used.
for method in methods.iter() {
write_cstr(method, buf)?;
write_cstr(method.as_bytes(), buf)?;
}
buf.put_u8(0); // zero terminator for the list
}
@@ -779,7 +759,7 @@ impl<'a> BeMessage<'a> {
buf.put_slice(b"CXX000\0");
buf.put_u8(b'M'); // the message
write_cstr(error_msg, buf)?;
write_cstr(error_msg.as_bytes(), buf)?;
buf.put_u8(0); // terminator
Ok::<_, io::Error>(())
@@ -819,12 +799,24 @@ impl<'a> BeMessage<'a> {
buf.put_u8(response);
}
BeMessage::ParameterStatus { name, value } => {
BeMessage::ParameterStatus(param) => {
use std::io::{IoSlice, Write};
use BeParameterStatusMessage::*;
let [name, value] = match param {
Encoding(name) => [b"client_encoding", name.as_bytes()],
ServerVersion(version) => [b"server_version", version.as_bytes()],
};
// Parameter names and values are passed as null-terminated strings
let iov = &mut [name, b"\0", value, b"\0"].map(IoSlice::new);
let mut buffer = [0u8; 64]; // this should be enough
let cnt = buffer.as_mut().write_vectored(iov).unwrap();
buf.put_u8(b'S');
write_body(buf, |buf| {
write_cstr(name, buf)?;
write_cstr(value, buf)
})?;
buf.put_slice(&buffer[..cnt]);
});
}
BeMessage::ParameterDescription => {

View File

@@ -4,13 +4,14 @@
//! allowing multiple api users to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
use std::env::var;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider,
meta::credentials::{CredentialsProviderChain, LazyCachingCredentialsProvider},
environment::credentials::EnvironmentVariableCredentialsProvider, imds,
imds::credentials::ImdsCredentialsProvider, meta::credentials::provide_credentials_fn,
};
use aws_sdk_s3::{
config::Config,
@@ -19,6 +20,7 @@ use aws_sdk_s3::{
Client, Endpoint, Region,
};
use aws_smithy_http::body::SdkBody;
use aws_types::credentials::{CredentialsError, ProvideCredentials};
use hyper::Body;
use tokio::{io, sync::Semaphore};
use tokio_util::io::ReaderStream;
@@ -29,6 +31,8 @@ use crate::{
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
const DEFAULT_IMDS_TIMEOUT: Duration = Duration::from_secs(10);
pub(super) mod metrics {
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
@@ -118,23 +122,30 @@ impl S3Bucket {
"Creating s3 remote storage for S3 bucket {}",
aws_config.bucket_name
);
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
let env_creds = EnvironmentVariableCredentialsProvider::new();
// uses imds v2
let imds = ImdsCredentialsProvider::builder().build();
// finally add caching.
// this might change in future, see https://github.com/awslabs/aws-sdk-rust/issues/629
LazyCachingCredentialsProvider::builder()
.load(CredentialsProviderChain::first_try("env", env_creds).or_else("imds", imds))
.build()
};
let mut config_builder = Config::builder()
.region(Region::new(aws_config.bucket_region.clone()))
.credentials_provider(credentials_provider);
.credentials_provider(provide_credentials_fn(|| async {
match var("AWS_ACCESS_KEY_ID").is_ok() && var("AWS_SECRET_ACCESS_KEY").is_ok() {
true => {
EnvironmentVariableCredentialsProvider::new()
.provide_credentials()
.await
}
false => {
let imds_client = imds::Client::builder()
.connect_timeout(DEFAULT_IMDS_TIMEOUT)
.read_timeout(DEFAULT_IMDS_TIMEOUT)
.build()
.await
.map_err(CredentialsError::unhandled)?;
ImdsCredentialsProvider::builder()
.imds_client(imds_client)
.build()
.provide_credentials()
.await
}
}
}));
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
let endpoint = Endpoint::immutable(

View File

@@ -6,7 +6,7 @@
use crate::sock_split::{BidiStream, ReadStream, WriteStream};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket};
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::fmt;
@@ -361,9 +361,11 @@ impl PostgresBackend {
match self.auth_type {
AuthType::Trust => {
self.write_message_noflush(&BeMessage::AuthenticationOk)?
.write_message_noflush(&BeMessage::CLIENT_ENCODING)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
// The async python driver requires a valid server_version
.write_message_noflush(&BeMessage::server_version("14.1"))?
.write_message_noflush(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion("14.1"),
))?
.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}
@@ -411,7 +413,7 @@ impl PostgresBackend {
}
}
self.write_message_noflush(&BeMessage::AuthenticationOk)?
.write_message_noflush(&BeMessage::CLIENT_ENCODING)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}

View File

@@ -6,7 +6,7 @@
use crate::postgres_backend::AuthType;
use anyhow::{bail, Context, Result};
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket};
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use rand::Rng;
use std::future::Future;
use std::net::SocketAddr;
@@ -331,9 +331,11 @@ impl PostgresBackend {
match self.auth_type {
AuthType::Trust => {
self.write_message(&BeMessage::AuthenticationOk)?
.write_message(&BeMessage::CLIENT_ENCODING)?
.write_message(&BeParameterStatusMessage::encoding())?
// The async python driver requires a valid server_version
.write_message(&BeMessage::server_version("14.1"))?
.write_message(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion("14.1"),
))?
.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}
@@ -382,7 +384,7 @@ impl PostgresBackend {
}
}
self.write_message(&BeMessage::AuthenticationOk)?
.write_message(&BeMessage::CLIENT_ENCODING)?
.write_message(&BeParameterStatusMessage::encoding())?
.write_message(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}

View File

@@ -11,13 +11,11 @@ use tokio::time::timeout;
/// An error happened while waiting for a number
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("SeqWaitError")]
pub enum SeqWaitError {
/// The wait timeout was reached
#[error("seqwait timeout was reached")]
Timeout,
/// [`SeqWait::shutdown`] was called
#[error("SeqWait::shutdown was called")]
Shutdown,
}

View File

@@ -36,9 +36,9 @@ nix = "0.25"
num-traits = "0.2.15"
once_cell = "1.13.0"
pin-project-lite = "0.2.7"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
rand = "0.8.3"
regex = "1.4.5"
@@ -52,7 +52,7 @@ svg_fmt = "0.4.1"
tar = "0.4.33"
thiserror = "1.0"
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-util = { version = "0.7.3", features = ["io", "io-util"] }
toml_edit = { version = "0.14", features = ["easy"] }
tracing = "0.1.36"

View File

@@ -201,12 +201,8 @@ fn initialize_config(
}
fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
// Initialize logging
logging::init(conf.log_format)?;
// Print version to the log, and expose it as a prometheus metric too.
info!("version: {}", version());
set_build_info_metric(GIT_VERSION);
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
@@ -222,37 +218,38 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
)
}
// Create and lock PID file. This ensures that there cannot be more than one
// pageserver process running at the same time.
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
utils::pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
info!("Claimed pid file at {lock_file_path:?}");
// Ensure that the lock file is held even if the main thread of the process panics.
// We need to release the lock file only when the process exits.
// ensure that the lock file is held even if the main thread of the process is panics
// we need to release the lock file only when the current process is gone
std::mem::forget(lock_file);
// Bind the HTTP and libpq ports early, so that if they are in use by some other
// process, we error out early.
let http_addr = &conf.listen_http_addr;
info!("Starting pageserver http handler on {http_addr}");
let http_listener = tcp_listener::bind(http_addr)?;
// TODO: Check that it looks like a valid repository before going further
let pg_addr = &conf.listen_pg_addr;
info!("Starting pageserver pg protocol handler on {pg_addr}");
let pageserver_listener = tcp_listener::bind(pg_addr)?;
// bind sockets before daemonizing so we report errors early and do not return until we are listening
info!(
"Starting pageserver http handler on {}",
conf.listen_http_addr
);
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone())?;
info!(
"Starting pageserver pg protocol handler on {}",
conf.listen_pg_addr
);
let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?;
// Install signal handlers
let signals = signals::install_shutdown_handlers()?;
// Start profiler (if enabled)
// start profiler (if enabled)
let profiler_guard = profiling::init_profiler(conf);
// Launch broker client
WALRECEIVER_RUNTIME.block_on(pageserver::walreceiver::init_broker_client(conf))?;
// Initialize authentication for incoming connections
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
AuthType::Trust | AuthType::MD5 => None,
AuthType::NeonJWT => {
@@ -280,7 +277,6 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
}
};
// Set up remote storage client
let remote_storage = conf
.remote_storage_config
.as_ref()
@@ -288,18 +284,30 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
.transpose()
.context("Failed to init generic remote storage")?;
// Scan the local 'tenants/' directory and start loading the tenants
BACKGROUND_RUNTIME.block_on(tenant_mgr::init_tenant_mgr(conf, remote_storage.clone()))?;
let (init_result_sender, init_result_receiver) =
std::sync::mpsc::channel::<anyhow::Result<()>>();
let storage_for_spawn = remote_storage.clone();
let _handler = BACKGROUND_RUNTIME.spawn(async move {
let result = tenant_mgr::init_tenant_mgr(conf, storage_for_spawn).await;
init_result_sender.send(result)
});
match init_result_receiver.recv() {
Ok(init_result) => init_result.context("Failed to init tenant_mgr")?,
Err(_sender_dropped_err) => {
anyhow::bail!("Failed to init tenant_mgr: no init status was returned");
}
}
// Start up the service to handle HTTP mgmt API request. We created the
// listener earlier already.
// Spawn all HTTP related tasks in the MGMT_REQUEST_RUNTIME.
// bind before launching separate thread so the error reported before startup exits
// Create a Service from the router above to handle incoming requests.
{
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
let router = http::make_router(conf, auth.clone(), remote_storage)?
.build()
.map_err(|err| anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let router = http::make_router(conf, auth.clone(), remote_storage)?;
let service =
utils::http::RouterService::new(router.build().map_err(|err| anyhow!(err))?).unwrap();
let server = hyper::Server::from_tcp(http_listener)?
.serve(service)
.with_graceful_shutdown(task_mgr::shutdown_watcher());
@@ -319,7 +327,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
}
// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
// for each connection.
task_mgr::spawn(
COMPUTE_REQUEST_RUNTIME.handle(),
TaskKind::LibpqEndpointListener,
@@ -332,6 +340,8 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
},
);
set_build_info_metric(GIT_VERSION);
// All started up! Now just sit and wait for shutdown signal.
signals.handle(|signal| match signal {
Signal::Quit => {

View File

@@ -137,7 +137,6 @@ pub struct PageServerConf {
/// Storage broker endpoints to connect to.
pub broker_endpoint: Uri,
pub broker_keepalive_interval: Duration,
pub log_format: LogFormat,
@@ -216,7 +215,6 @@ struct PageServerConfigBuilder {
profiling: BuilderValue<ProfilingConfig>,
broker_endpoint: BuilderValue<Uri>,
broker_keepalive_interval: BuilderValue<Duration>,
log_format: BuilderValue<LogFormat>,
@@ -249,10 +247,6 @@ impl Default for PageServerConfigBuilder {
broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
.parse()
.expect("failed to parse default broker endpoint")),
broker_keepalive_interval: Set(humantime::parse_duration(
storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
)
.expect("cannot parse default keepalive interval")),
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
@@ -316,10 +310,6 @@ impl PageServerConfigBuilder {
self.broker_endpoint = BuilderValue::Set(broker_endpoint)
}
pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) {
self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
}
pub fn id(&mut self, node_id: NodeId) {
self.id = BuilderValue::Set(node_id)
}
@@ -375,9 +365,6 @@ impl PageServerConfigBuilder {
broker_endpoint: self
.broker_endpoint
.ok_or(anyhow!("No broker endpoints provided"))?,
broker_keepalive_interval: self
.broker_keepalive_interval
.ok_or(anyhow!("No broker keepalive interval provided"))?,
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
concurrent_tenant_size_logical_size_queries: self
.concurrent_tenant_size_logical_size_queries
@@ -545,7 +532,6 @@ impl PageServerConf {
"id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
"broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
"broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
"log_format" => builder.log_format(
LogFormat::from_config(&parse_toml_string(key, item)?)?
),
@@ -673,7 +659,6 @@ impl PageServerConf {
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::dummy_conf(),
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
broker_keepalive_interval: Duration::from_secs(5000),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
}
@@ -844,9 +829,6 @@ log_format = 'json'
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
broker_keepalive_interval: humantime::parse_duration(
storage_broker::DEFAULT_KEEPALIVE_INTERVAL
)?,
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
},
@@ -890,7 +872,6 @@ log_format = 'json'
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
broker_keepalive_interval: Duration::from_secs(5),
log_format: LogFormat::Json,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
},

View File

@@ -795,6 +795,37 @@ components:
latest_gc_cutoff_lsn:
type: string
format: hex
# These 'local' and 'remote' fields just duplicate some of the fields
# above. They are kept for backwards-compatibility. They can be removed,
# when the control plane has been updated to look at the above fields
# directly.
local:
$ref: "#/components/schemas/LocalTimelineInfo"
remote:
$ref: "#/components/schemas/RemoteTimelineInfo"
LocalTimelineInfo:
type: object
properties:
ancestor_timeline_id:
type: string
format: hex
ancestor_lsn:
type: string
format: hex
current_logical_size:
type: integer
current_physical_size:
type: integer
RemoteTimelineInfo:
type: object
required:
- remote_consistent_lsn
properties:
remote_consistent_lsn:
type: string
format: hex
Error:
type: object
required:

View File

@@ -7,8 +7,8 @@ use remote_storage::GenericRemoteStorage;
use tracing::*;
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineInfo,
LocalTimelineInfo, RemoteTimelineInfo, StatusResponse, TenantConfigRequest,
TenantCreateRequest, TenantCreateResponse, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::tenant::Timeline;
@@ -147,6 +147,18 @@ fn build_timeline_info_common(timeline: &Arc<Timeline>) -> anyhow::Result<Timeli
pg_version: timeline.pg_version,
state,
// Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility
// with the control plane.
local: LocalTimelineInfo {
ancestor_timeline_id,
ancestor_lsn,
current_logical_size,
current_physical_size,
},
remote: RemoteTimelineInfo {
remote_consistent_lsn: Some(remote_consistent_lsn),
},
};
Ok(info)
}

View File

@@ -25,6 +25,7 @@
//! the current task has been requested to shut down. You can use that with
//! Tokio select!().
//!
//!
//! TODO: This would be a good place to also handle panics in a somewhat sane way.
//! Depending on what task panics, we might want to kill the whole server, or
//! only a single tenant or timeline.
@@ -42,9 +43,9 @@ use std::sync::{Arc, Mutex};
use futures::FutureExt;
use tokio::runtime::Runtime;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
@@ -145,10 +146,11 @@ static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
task_local! {
// This is a cancellation token which will be cancelled when a task needs to shut down. The
// root token is kept in the global registry, so that anyone can send the signal to request
// task shutdown.
static SHUTDOWN_TOKEN: CancellationToken;
// There is a Tokio watch channel for each task, which can be used to signal the
// task that it needs to shut down. This task local variable holds the receiving
// end of the channel. The sender is kept in the global registry, so that anyone
// can send the signal to request task shutdown.
static SHUTDOWN_RX: watch::Receiver<bool>;
// Each task holds reference to its own PageServerTask here.
static CURRENT_TASK: Arc<PageServerTask>;
@@ -224,8 +226,8 @@ struct PageServerTask {
name: String,
// To request task shutdown, just cancel this token.
cancel: CancellationToken,
// To request task shutdown, send 'true' to the channel to notify the task.
shutdown_tx: watch::Sender<bool>,
mutable: Mutex<MutableTaskState>,
}
@@ -245,13 +247,13 @@ pub fn spawn<F>(
where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let cancel = CancellationToken::new();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
let task = Arc::new(PageServerTask {
task_id: PageserverTaskId(task_id),
kind,
name: name.to_string(),
cancel: cancel.clone(),
shutdown_tx,
mutable: Mutex::new(MutableTaskState {
tenant_id,
timeline_id,
@@ -269,7 +271,7 @@ where
task_name,
task_id,
task_cloned,
cancel,
shutdown_rx,
shutdown_process_on_error,
future,
));
@@ -286,7 +288,7 @@ async fn task_wrapper<F>(
task_name: String,
task_id: u64,
task: Arc<PageServerTask>,
shutdown_token: CancellationToken,
shutdown_rx: watch::Receiver<bool>,
shutdown_process_on_error: bool,
future: F,
) where
@@ -294,9 +296,9 @@ async fn task_wrapper<F>(
{
debug!("Starting task '{}'", task_name);
let result = SHUTDOWN_TOKEN
let result = SHUTDOWN_RX
.scope(
shutdown_token,
shutdown_rx,
CURRENT_TASK.scope(task, {
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
@@ -406,7 +408,7 @@ pub async fn shutdown_tasks(
&& (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
&& (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
{
task.cancel.cancel();
let _ = task.shutdown_tx.send_replace(true);
victim_tasks.push(Arc::clone(task));
}
}
@@ -437,28 +439,21 @@ pub fn current_task_kind() -> Option<TaskKind> {
/// A Future that can be used to check if the current task has been requested to
/// shut down.
pub async fn shutdown_watcher() {
let token = SHUTDOWN_TOKEN
.try_with(|t| t.clone())
let mut shutdown_rx = SHUTDOWN_RX
.try_with(|rx| rx.clone())
.expect("shutdown_requested() called in an unexpected task or thread");
token.cancelled().await;
}
/// Clone the current task's cancellation token, which can be moved across tasks.
///
/// When the task which is currently executing is shutdown, the cancellation token will be
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
/// `tokio::task::JoinSet::spawn`.
pub fn shutdown_token() -> CancellationToken {
SHUTDOWN_TOKEN
.try_with(|t| t.clone())
.expect("shutdown_token() called in an unexpected task or thread")
while !*shutdown_rx.borrow() {
if shutdown_rx.changed().await.is_err() {
break;
}
}
}
/// Has the current task been requested to shut down?
pub fn is_shutdown_requested() -> bool {
if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) {
cancel.is_cancelled()
if let Ok(shutdown_rx) = SHUTDOWN_RX.try_with(|rx| rx.clone()) {
*shutdown_rx.borrow()
} else {
if !cfg!(test) {
warn!("is_shutdown_requested() called in an unexpected task or thread");

View File

@@ -1164,6 +1164,7 @@ impl Tenant {
target_timeline_id: Option<TimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> anyhow::Result<GcResult> {
anyhow::ensure!(
self.is_active(),
@@ -1178,7 +1179,7 @@ impl Tenant {
let _timer = STORAGE_TIME
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
.start_timer();
self.gc_iteration_internal(target_timeline_id, horizon, pitr)
self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
.await
}
}
@@ -1777,6 +1778,7 @@ impl Tenant {
target_timeline_id: Option<TimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> anyhow::Result<GcResult> {
let mut totals: GcResult = Default::default();
let now = Instant::now();
@@ -1803,6 +1805,18 @@ impl Tenant {
// made.
break;
}
// If requested, force flush all in-memory layers to disk first,
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint(CheckpointConfig::Forced).await?;
info!(
"timeline {} checkpoint_before_gc done",
timeline.timeline_id
);
}
let result = timeline.gc().await?;
totals += result;
}
@@ -2863,7 +2877,7 @@ mod tests {
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
// try to branch at lsn 25, should fail because we already garbage collected the data
@@ -2919,7 +2933,7 @@ mod tests {
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
match tline.get(*TEST_KEY, Lsn(0x25)) {
@@ -2946,7 +2960,7 @@ mod tests {
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
@@ -2971,7 +2985,7 @@ mod tests {
// run gc on parent
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
// Check that the data is still accessible on the branch.

View File

@@ -2487,6 +2487,9 @@ impl Timeline {
);
write_guard.store_and_unlock(new_gc_cutoff).wait();
}
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
info!("GC starting");
@@ -2597,33 +2600,19 @@ impl Timeline {
layers_to_remove.push(Arc::clone(&l));
}
if !layers_to_remove.is_empty() {
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
for doomed_layer in layers_to_remove {
let path = doomed_layer.local_path();
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_names_to_delete.push(doomed_layer.filename());
doomed_layer.delete()?;
layers.remove_historic(doomed_layer);
result.layers_removed += 1;
}
if result.layers_removed != 0 {
fail_point!("after-timeline-gc-removed-layers");
}
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
}
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
for doomed_layer in layers_to_remove {
let path = doomed_layer.local_path();
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_names_to_delete.push(doomed_layer.filename());
doomed_layer.delete()?;
layers.remove_historic(doomed_layer);
result.layers_removed += 1;
}
info!(
@@ -2631,6 +2620,14 @@ impl Timeline {
result.layers_removed, new_gc_cutoff
);
if result.layers_removed != 0 {
fail_point!("after-timeline-gc-removed-layers");
}
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
}
result.elapsed = now.elapsed()?;
Ok(result)
}

View File

@@ -496,7 +496,7 @@ pub async fn immediate_gc(
async move {
fail::fail_point!("immediate_gc_task_pre");
let result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr)
.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)
.instrument(info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id))
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it

View File

@@ -127,7 +127,7 @@ async fn gc_loop(tenant_id: TenantId) {
} else {
// Run gc
if gc_horizon > 0 {
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval()).await
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false).await
{
sleep_duration = wait_duration;
error!("Gc failed, retrying in {:?}: {e:?}", sleep_duration);

View File

@@ -44,13 +44,10 @@ pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client =
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
),
)?;
let broker_client = storage_broker::connect(broker_endpoint.clone()).context(format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
))?;
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
@@ -129,21 +126,15 @@ impl<E: Clone> TaskHandle<E> {
match self.events_receiver.changed().await {
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
Err(_task_channel_part_dropped) => {
TaskEvent::End(match self.join_handle.as_mut() {
TaskEvent::End(match self.join_handle.take() {
Some(jh) => {
if !jh.is_finished() {
warn!("sender is dropped while join handle is still alive");
}
let res = jh
.await
jh.await
.map_err(|e| anyhow::anyhow!("Failed to join task: {e}"))
.and_then(|x| x);
// For cancellation-safety, drop join_handle only after successful .await.
self.join_handle = None;
res
.and_then(|x| x)
}
None => {
// Another option is to have an enum, join handle or result and give away the reference to it

View File

@@ -1669,7 +1669,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
* (leaving holes). But this rule is violated in PG-15 where CreateAndCopyRelationData
* call smgrextend for destination relation n using size of source relation
*/
n_blocks = neon_nblocks(reln, forkNum);
get_cached_relsize(reln->smgr_rnode.node, forkNum, &n_blocks);
while (n_blocks < blkno)
neon_wallog_page(reln, forkNum, n_blocks++, buffer, true);

View File

@@ -33,7 +33,7 @@ sha2 = "0.10.2"
socket2 = "0.4.4"
thiserror = "1.0.30"
tokio = { version = "1.17", features = ["macros"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-rustls = "0.23.0"
tracing = "0.1.36"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -1,6 +1,6 @@
use super::{AuthSuccess, NodeInfo};
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
use pq_proto::BeMessage as Be;
use pq_proto::{BeMessage as Be, BeParameterStatusMessage};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span};
@@ -60,7 +60,7 @@ pub async fn handle_user(
info!(parent: &span, "sending the auth URL to the user");
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&Be::CLIENT_ENCODING)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message(&Be::NoticeResponse(&greeting))
.await?;

View File

@@ -8,17 +8,18 @@ use tokio::net::TcpStream;
use tokio_postgres::NoTls;
use tracing::{error, info};
const COULD_NOT_CONNECT: &str = "Could not connect to compute node";
#[derive(Debug, Error)]
pub enum ConnectionError {
/// This error doesn't seem to reveal any secrets; for instance,
/// [`tokio_postgres::error::Kind`] doesn't contain ip addresses and such.
#[error("{COULD_NOT_CONNECT}: {0}")]
#[error("Failed to connect to the compute node: {0}")]
Postgres(#[from] tokio_postgres::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
#[error("Failed to connect to the compute node")]
FailedToConnectToCompute,
#[error("Failed to fetch compute node version")]
FailedToFetchPgVersion,
}
impl UserFacingError for ConnectionError {
@@ -28,10 +29,10 @@ impl UserFacingError for ConnectionError {
// This helps us drop irrelevant library-specific prefixes.
// TODO: propagate severity level and other parameters.
Postgres(err) => match err.as_db_error() {
Some(err) => err.message().to_owned(),
Some(err) => err.message().to_string(),
None => err.to_string(),
},
_ => COULD_NOT_CONNECT.to_owned(),
other => other.to_string(),
}
}
}
@@ -48,7 +49,7 @@ pub struct ConnCfg(pub tokio_postgres::Config);
impl ConnCfg {
/// Construct a new connection config.
pub fn new() -> Self {
Self(Default::default())
Self(tokio_postgres::Config::new())
}
}
@@ -94,7 +95,7 @@ impl ConnCfg {
io::ErrorKind::Other,
format!(
"couldn't connect: bad compute config, \
ports and hosts entries' count does not match: {:?}",
ports and hosts entries' count does not match: {:?}",
self.0
),
));
@@ -130,8 +131,8 @@ impl ConnCfg {
pub struct PostgresConnection {
/// Socket connected to a compute node.
pub stream: TcpStream,
/// PostgreSQL connection parameters.
pub params: std::collections::HashMap<String, String>,
/// PostgreSQL version of this instance.
pub version: String,
}
impl ConnCfg {
@@ -155,7 +156,6 @@ impl ConnCfg {
self.0.application_name(app_name);
}
// TODO: This is especially ugly...
if let Some(replication) = params.get("replication") {
use tokio_postgres::config::ReplicationMode;
match replication {
@@ -172,24 +172,22 @@ impl ConnCfg {
// TODO: extend the list of the forwarded startup parameters.
// Currently, tokio-postgres doesn't allow us to pass
// arbitrary parameters, but the ones above are a good start.
//
// This and the reverse params problem can be better addressed
// in a bespoke connection machinery (a new library for that sake).
// TODO: establish a secure connection to the DB.
let (socket_addr, mut stream) = self.connect_raw().await?;
let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?;
let (socket_addr, mut stream) = self
.connect_raw()
.await
.map_err(|_| ConnectionError::FailedToConnectToCompute)?;
// TODO: establish a secure connection to the DB
let (client, conn) = self.0.connect_raw(&mut stream, NoTls).await?;
let version = conn
.parameter("server_version")
.ok_or(ConnectionError::FailedToFetchPgVersion)?
.into();
info!("connected to user's compute node at {socket_addr}");
// This is very ugly but as of now there's no better way to
// extract the connection parameters from tokio-postgres' connection.
// TODO: solve this problem in a more elegant manner (e.g. the new library).
let params = connection.parameters;
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
let db = PostgresConnection { stream, params };
let db = PostgresConnection { stream, version };
Ok((db, cancel_closure))
}

View File

@@ -255,21 +255,15 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
// Note that we do this only (for the most part) after we've connected
// to a compute (see above) which performs its own authentication.
if !auth_result.reported_auth_ok {
stream.write_message_noflush(&Be::AuthenticationOk)?;
}
// Forward all postgres connection params to the client.
// Right now the implementation is very hacky and inefficent (ideally,
// we don't need an intermediate hashmap), but at least it should be correct.
for (name, value) in &db.params {
// TODO: Theoretically, this could result in a big pile of params...
stream.write_message_noflush(&Be::ParameterStatus {
name: name.as_bytes(),
value: value.as_bytes(),
})?;
stream
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
}
stream
.write_message_noflush(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion(&db.version),
))?
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
.write_message(&BeMessage::ReadyForQuery)
.await?;

View File

@@ -139,7 +139,7 @@ async fn dummy_proxy(
stream
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&Be::CLIENT_ENCODING)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message(&BeMessage::ReadyForQuery)
.await?;

View File

@@ -20,8 +20,8 @@ hyper = "0.14"
nix = "0.25"
once_cell = "1.13.0"
parking_lot = "0.12.1"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1.4.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
@@ -29,7 +29,7 @@ serde_with = "2.0"
signal-hook = "0.3.10"
thiserror = "1"
tokio = { version = "1.17", features = ["macros", "fs"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
toml_edit = { version = "0.14", features = ["easy"] }
tracing = "0.1.27"
url = "2.2.2"

View File

@@ -82,9 +82,6 @@ struct Args {
/// established; plaintext otherwise.
#[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)]
broker_endpoint: Uri,
/// Broker keepalive interval.
#[arg(long, value_parser= humantime::parse_duration, default_value = storage_broker::DEFAULT_KEEPALIVE_INTERVAL)]
broker_keepalive_interval: Duration,
/// Peer safekeeper is considered dead after not receiving heartbeats from
/// it during this period passed as a human readable duration.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)]
@@ -129,6 +126,9 @@ fn main() -> anyhow::Result<()> {
logging::init(LogFormat::from_config(&args.log_format)?)?;
info!("version: {GIT_VERSION}");
// Change into the data directory.
std::env::set_current_dir(&args.datadir)?;
// Set or read our ID.
let id = set_id(&args.datadir, args.id.map(NodeId))?;
if args.init {
@@ -142,7 +142,6 @@ fn main() -> anyhow::Result<()> {
listen_http_addr: args.listen_http,
no_sync: args.no_sync,
broker_endpoint: args.broker_endpoint,
broker_keepalive_interval: args.broker_keepalive_interval,
heartbeat_timeout: args.heartbeat_timeout,
remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag,

View File

@@ -66,7 +66,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
/// Subscribe and fetch all the interesting data from the broker.
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
let mut client = storage_broker::connect(conf.broker_endpoint)?;
// TODO: subscribe only to local timelines instead of all
let request = SubscribeSafekeeperInfoRequest {

View File

@@ -51,7 +51,6 @@ pub struct SafeKeeperConf {
pub listen_http_addr: String,
pub no_sync: bool,
pub broker_endpoint: Uri,
pub broker_keepalive_interval: Duration,
pub heartbeat_timeout: Duration,
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
@@ -84,7 +83,6 @@ impl SafeKeeperConf {
broker_endpoint: storage_broker::DEFAULT_ENDPOINT
.parse()
.expect("failed to parse default broker endpoint"),
broker_keepalive_interval: Duration::from_secs(5),
backup_runtime_threads: None,
wal_backup_enabled: true,
auth_validation_public_key_path: None,

View File

@@ -88,7 +88,7 @@ fn tli_from_u64(i: u64) -> Vec<u8> {
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
let mut client = match client {
Some(c) => c,
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(),
};
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
@@ -112,7 +112,7 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
let mut client = match client {
Some(c) => c,
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(),
};
let mut counter: u64 = 0;
@@ -152,7 +152,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let h = tokio::spawn(progress_reporter(counters.clone()));
let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap();
let c = storage_broker::connect(DEFAULT_ENDPOINT).unwrap();
for i in 0..args.num_subs {
let c = Some(c.clone());

View File

@@ -39,9 +39,7 @@ use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use storage_broker::{
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
};
use storage_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR};
use utils::id::TenantTimelineId;
use utils::logging::{self, LogFormat};
use utils::project_git_version;
@@ -49,8 +47,8 @@ use utils::sentry_init::{init_sentry, release_name};
project_git_version!(GIT_VERSION);
const DEFAULT_CHAN_SIZE: usize = 32;
const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
const DEFAULT_CHAN_SIZE: usize = 128;
const DEFAULT_HTTP2_KEEPALIVE_INTERVAL: &str = "5000ms";
#[derive(Parser, Debug)]
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
@@ -58,14 +56,11 @@ struct Args {
/// Endpoint to listen on.
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
listen_addr: SocketAddr,
/// Size of the queue to the per timeline subscriber.
/// Size of the queue to the subscriber.
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
timeline_chan_size: usize,
/// Size of the queue to the all keys subscriber.
#[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
all_keys_chan_size: usize,
chan_size: usize,
/// HTTP/2 keepalive interval.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HTTP2_KEEPALIVE_INTERVAL)]
http2_keepalive_interval: Duration,
/// Format for logging, either 'plain' or 'json'.
#[arg(long, default_value = "plain")]
@@ -113,7 +108,7 @@ struct SharedState {
}
impl SharedState {
pub fn new(all_keys_chan_size: usize) -> Self {
pub fn new(chan_size: usize) -> Self {
SharedState {
next_pub_id: 0,
num_pubs: 0,
@@ -121,7 +116,7 @@ impl SharedState {
num_subs_to_timelines: 0,
chans_to_timeline_subs: HashMap::new(),
num_subs_to_all: 0,
chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
chan_to_all_subs: broadcast::channel(chan_size).0,
}
}
@@ -144,7 +139,7 @@ impl SharedState {
pub fn register_subscriber(
&mut self,
sub_key: SubscriptionKey,
timeline_chan_size: usize,
chan_size: usize,
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
let sub_id = self.next_sub_id;
self.next_sub_id += 1;
@@ -163,7 +158,7 @@ impl SharedState {
self.chans_to_timeline_subs
.entry(ttid)
.or_insert(ChanToTimelineSub {
chan: broadcast::channel(timeline_chan_size).0,
chan: broadcast::channel(chan_size).0,
num_subscribers: 0,
});
chan_to_timeline_sub.num_subscribers += 1;
@@ -205,7 +200,7 @@ impl SharedState {
#[derive(Clone)]
struct Registry {
shared_state: Arc<RwLock<SharedState>>,
timeline_chan_size: usize,
chan_size: usize,
}
impl Registry {
@@ -237,7 +232,7 @@ impl Registry {
let (sub_id, sub_rx) = self
.shared_state
.write()
.register_subscriber(sub_key, self.timeline_chan_size);
.register_subscriber(sub_key, self.chan_size);
info!(
"subscription started id={}, key={:?}, addr={:?}",
sub_id, sub_key, remote_addr
@@ -374,8 +369,9 @@ impl BrokerService for Broker {
Err(RecvError::Lagged(skipped_msg)) => {
missed_msgs += skipped_msg;
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
warn!("subscription id={}, key={:?}, addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
Err(Status::new(Code::Internal, "full channel"))?;
missed_msgs = 0;
}
}
@@ -433,8 +429,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("version: {GIT_VERSION}");
let registry = Registry {
shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
timeline_chan_size: args.timeline_chan_size,
shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))),
chan_size: args.chan_size,
};
let storage_broker_impl = Broker {
registry: registry.clone(),
@@ -528,7 +524,7 @@ mod tests {
async fn test_registry() {
let registry = Registry {
shared_state: Arc::new(RwLock::new(SharedState::new(16))),
timeline_chan_size: 16,
chan_size: 16,
};
// subscribe to timeline 2

View File

@@ -1,7 +1,6 @@
use hyper::body::HttpBody;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tonic::codegen::StdError;
use tonic::transport::{ClientTlsConfig, Endpoint};
use tonic::{transport::Channel, Code, Status};
@@ -27,8 +26,6 @@ pub use hyper::Uri;
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
// BrokerServiceClient charged with tonic provided Channel transport; helps to
// avoid depending on tonic directly in user crates.
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
@@ -36,7 +33,7 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
// validated here.
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
pub fn connect<U>(endpoint: U) -> anyhow::Result<BrokerClientChannel>
where
U: std::convert::TryInto<Uri>,
U::Error: std::error::Error + Send + Sync + 'static,
@@ -49,10 +46,6 @@ where
let tls = ClientTlsConfig::new();
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
}
tonic_endpoint = tonic_endpoint
.http2_keep_alive_interval(keepalive_interval)
.keep_alive_while_idle(true);
// keep_alive_timeout is 20s by default on both client and server side
let channel = tonic_endpoint.connect_lazy();
Ok(BrokerClientChannel::new(channel))
}

View File

@@ -11,7 +11,7 @@ from datetime import datetime
from pathlib import Path
# Type-related stuff
from typing import Callable, ClassVar, Dict, Iterator, Optional
from typing import Callable, ClassVar, Iterator, Optional
import pytest
from _pytest.config import Config
@@ -135,26 +135,23 @@ class PgBenchRunResult:
@dataclasses.dataclass
class PgBenchInitResult:
# Taken from https://github.com/postgres/postgres/blob/REL_15_1/src/bin/pgbench/pgbench.c#L5144-L5171
EXTRACTORS: ClassVar[Dict[str, re.Pattern]] = { # type: ignore[type-arg]
"drop_tables": re.compile(r"drop tables (\d+\.\d+) s"),
"create_tables": re.compile(r"create tables (\d+\.\d+) s"),
"client_side_generate": re.compile(r"client-side generate (\d+\.\d+) s"),
"server_side_generate": re.compile(r"server-side generate (\d+\.\d+) s"),
"vacuum": re.compile(r"vacuum (\d+\.\d+) s"),
"primary_keys": re.compile(r"primary keys (\d+\.\d+) s"),
"foreign_keys": re.compile(r"foreign keys (\d+\.\d+) s"),
"total": re.compile(r"done in (\d+\.\d+) s"), # Total time printed by pgbench
}
REGEX: ClassVar[re.Pattern] = re.compile( # type: ignore[type-arg]
r"done in (\d+\.\d+) s "
r"\("
r"(?:drop tables (\d+\.\d+) s)?(?:, )?"
r"(?:create tables (\d+\.\d+) s)?(?:, )?"
r"(?:client-side generate (\d+\.\d+) s)?(?:, )?"
r"(?:vacuum (\d+\.\d+) s)?(?:, )?"
r"(?:primary keys (\d+\.\d+) s)?(?:, )?"
r"\)\."
)
total: Optional[float]
total: float
drop_tables: Optional[float]
create_tables: Optional[float]
client_side_generate: Optional[float]
server_side_generate: Optional[float]
vacuum: Optional[float]
primary_keys: Optional[float]
foreign_keys: Optional[float]
duration: float
start_timestamp: int
end_timestamp: int
@@ -167,35 +164,25 @@ class PgBenchInitResult:
start_timestamp: int,
end_timestamp: int,
):
# Parses pgbench initialize output
# Parses pgbench initialize output for default initialization steps (dtgvp)
# Example: done in 5.66 s (drop tables 0.05 s, create tables 0.31 s, client-side generate 2.01 s, vacuum 0.53 s, primary keys 0.38 s).
last_line = stderr.splitlines()[-1]
timings: Dict[str, Optional[float]] = {}
last_line_items = re.split(r"\(|\)|,", last_line)
for item in last_line_items:
for key, regex in cls.EXTRACTORS.items():
if (m := regex.match(item.strip())) is not None:
if key in timings:
raise RuntimeError(
f"can't store pgbench results for repeated action `{key}`"
)
timings[key] = float(m.group(1))
if not timings or "total" not in timings:
if (m := cls.REGEX.match(last_line)) is not None:
total, drop_tables, create_tables, client_side_generate, vacuum, primary_keys = [
float(v) for v in m.groups() if v is not None
]
else:
raise RuntimeError(f"can't parse pgbench initialize results from `{last_line}`")
return cls(
total=timings["total"],
drop_tables=timings.get("drop_tables", 0.0),
create_tables=timings.get("create_tables", 0.0),
client_side_generate=timings.get("client_side_generate", 0.0),
server_side_generate=timings.get("server_side_generate", 0.0),
vacuum=timings.get("vacuum", 0.0),
primary_keys=timings.get("primary_keys", 0.0),
foreign_keys=timings.get("foreign_keys", 0.0),
total=total,
drop_tables=drop_tables,
create_tables=create_tables,
client_side_generate=client_side_generate,
vacuum=vacuum,
primary_keys=primary_keys,
duration=duration,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
@@ -339,10 +326,8 @@ class NeonBenchmarker:
"drop_tables",
"create_tables",
"client_side_generate",
"server_side_generate",
"vacuum",
"primary_keys",
"foreign_keys",
]
for metric in metrics:
if (value := getattr(result, metric)) is not None:

View File

@@ -115,7 +115,6 @@ class NeonCompare(PgCompare):
return self._pg_bin
def flush(self):
self.pageserver_http_client.timeline_checkpoint(self.env.initial_tenant, self.timeline)
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
def compact(self):

View File

@@ -3084,34 +3084,3 @@ def fork_at_current_lsn(
"""
current_lsn = pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
def wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: List[Safekeeper],
pageserver: NeonPageserver,
):
sk_commit_lsns = [
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn for sk in safekeepers
]
lsn = max(sk_commit_lsns)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, lsn)
return lsn
def wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id: TenantId,
timeline_id: TimelineId,
safekeepers: List[Safekeeper],
pageserver: NeonPageserver,
):
lsn = wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn(
tenant_id, timeline_id, safekeepers, pageserver
)
ps_http = pageserver.http_client()
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
return lsn

View File

@@ -15,7 +15,7 @@ from fixtures.utils import get_scale_for_db
@enum.unique
class PgBenchLoadType(enum.Enum):
INIT = "init"
SIMPLE_UPDATE = "simple-update"
SIMPLE_UPDATE = "simple_update"
SELECT_ONLY = "select-only"
@@ -94,9 +94,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
if workload_type == PgBenchLoadType.INIT:
# Run initialize
init_pgbench(
env, ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr], password=password
)
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", connstr], password=password)
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
# Run simple-update workload

View File

@@ -84,7 +84,6 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
# Set the GC horizon so that lsn1 is inside the horizon, which means
# we can create a new branch starting from lsn1.
pageserver_http_client.timeline_checkpoint(tenant, timeline_main)
pageserver_http_client.timeline_gc(tenant, timeline_main, lsn2 - lsn1 + 1024)
env.neon_cli.create_branch(
@@ -157,7 +156,6 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
# branch creation task but the individual timeline GC iteration happens *after*
# the branch creation task.
pageserver_http_client.configure_failpoints(("before-timeline-gc", "sleep(2000)"))
pageserver_http_client.timeline_checkpoint(tenant, b0)
def do_gc():
pageserver_http_client.timeline_gc(tenant, b0, 0)

View File

@@ -109,7 +109,6 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# check that we cannot create branch based on garbage collected data
with env.pageserver.http_client() as pageserver_http:
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)

View File

@@ -2,17 +2,9 @@ import asyncio
import concurrent.futures
import random
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
Postgres,
RemoteStorageKind,
wait_for_last_flush_lsn,
)
from fixtures.types import TenantId, TimelineId
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.types import TimelineId
from fixtures.utils import query_scalar
# Test configuration
@@ -43,13 +35,11 @@ async def gc(env: NeonEnv, timeline: TimelineId):
loop = asyncio.get_running_loop()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < updates_to_perform:
await loop.run_in_executor(pool, do_gc)
await loop.run_in_executor(
pool, lambda: pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
)
# At the same time, run UPDATEs and GC
@@ -97,76 +87,3 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
r = cur.fetchone()
assert r is not None
assert r == (num_rows, updates_to_perform)
#
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_gc_index_upload",
)
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_index_upload", "main")
pg = env.postgres.create_start("test_gc_index_upload")
pageserver_http = env.pageserver.http_client()
pg_conn = pg.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
):
total += sample[2]
return int(total)
# Sanity check that the metric works
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
before = get_num_remote_ops("index", "upload")
assert before > 0
# Run many cycles of GC. Then check that the number of index files
# uploads didn't grow much. In particular we don't want to re-upload the
# index file on every GC iteration, when it has no work to do.
#
# On each iteration, we use a slightly smaller GC horizon, so that the GC
# at least needs to check if it has work to do.
for i in range(100):
cur.execute("INSERT INTO foo VALUES (0, 0, 'foo')")
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000 - i * 32)
num_index_uploads = get_num_remote_ops("index", "upload")
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")
after = num_index_uploads
log.info(f"{after-before} new index uploads during test")
assert after - before < 5

View File

@@ -306,7 +306,6 @@ def _import(
# Check that gc works
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant, timeline)
pageserver_http.timeline_gc(tenant, timeline, 0)
return tar_output_file

View File

@@ -59,7 +59,6 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
# Make a lot of updates on a single row, generating a lot of WAL. Trigger
# garbage collections so that the page server will remove old page versions.
for i in range(10):
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)

View File

@@ -52,7 +52,6 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
# run GC
with env.pageserver.http_client() as pageserver_http:
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_compact(env.initial_tenant, timeline)
# perform aggressive GC. Data still should be kept because of the PITR setting.
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)

View File

@@ -122,33 +122,3 @@ def test_auth_errors(static_proxy: NeonProxy):
# Finally, check that the user can connect
with static_proxy.connect(user="pinocchio", password="magic", options="project=irrelevant"):
pass
def test_forward_params_to_client(static_proxy: NeonProxy):
# A subset of parameters (GUCs) which postgres
# sends to the client during connection setup.
# Unfortunately, `GUC_REPORT` can't be queried.
# Proxy *should* forward them, otherwise client library
# might misbehave (e.g. parse timestamps incorrectly).
reported_params_subset = [
"client_encoding",
"integer_datetimes",
"is_superuser",
"server_encoding",
"server_version",
"session_authorization",
"standard_conforming_strings",
]
query = """
select name, setting
from pg_catalog.pg_settings
where name = any(%s)
"""
with static_proxy.connect(options="project=irrelevant") as conn:
with conn.cursor() as cur:
cur.execute(query, (reported_params_subset,))
for name, value in cur.fetchall():
# Check that proxy has forwarded this parameter.
assert conn.get_parameter_status(name) == value

View File

@@ -24,7 +24,6 @@ def do_gc_target(
"""Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211"""
try:
log.info("sending gc http request")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 0)
except Exception as e:
log.error("do_gc failed: %s", e)

View File

@@ -24,7 +24,6 @@ from fixtures.neon_fixtures import (
assert_no_in_progress_downloads_for_tenant,
available_remote_storages,
wait_for_last_record_lsn,
wait_for_sk_commit_lsn_to_reach_remote_storage,
wait_for_upload,
)
from fixtures.types import Lsn, TenantId, TimelineId
@@ -162,9 +161,16 @@ def test_tenants_attached_after_download(
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
sk_commit_lsns = [
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn
for sk in env.safekeepers
]
log.info("wait for pageserver to process all the WAL")
wait_for_last_record_lsn(client, tenant_id, timeline_id, max(sk_commit_lsns))
log.info("wait for it to reach remote storage")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, max(sk_commit_lsns))
log.info("latest safekeeper_commit_lsn reached remote storage")
detail_before = client.timeline_detail(
tenant_id, timeline_id, include_non_incremental_physical_size=True

View File

@@ -326,6 +326,7 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
assert_physical_size(env, env.initial_tenant, new_timeline_id)