mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-15 16:40:37 +00:00
Compare commits
30 Commits
problame/g
...
sk/proxy_t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
488bb0cd46 | ||
|
|
bba82fa73f | ||
|
|
be0238db3d | ||
|
|
8945fbdb31 | ||
|
|
05ac0e2493 | ||
|
|
bfd45dd671 | ||
|
|
7f80230fd2 | ||
|
|
78bbbccadb | ||
|
|
dbbe032c39 | ||
|
|
cb9473928d | ||
|
|
fa20e37574 | ||
|
|
4911d7ce6f | ||
|
|
e83684b868 | ||
|
|
afbbc61036 | ||
|
|
7ba5c286b7 | ||
|
|
02b28ae0b1 | ||
|
|
0bfbae2d73 | ||
|
|
f1b7dc4064 | ||
|
|
e2a5177e89 | ||
|
|
0c083564ce | ||
|
|
d8dd60dc81 | ||
|
|
73f34eaa5e | ||
|
|
c2496c7ef2 | ||
|
|
ebea298415 | ||
|
|
5ffa20dd82 | ||
|
|
75ea8106ec | ||
|
|
017d3a390d | ||
|
|
589cf1ed21 | ||
|
|
0c82ff3d98 | ||
|
|
8895f28dae |
@@ -4,7 +4,7 @@
|
||||
hakari-package = "workspace_hack"
|
||||
|
||||
# Format for `workspace-hack = ...` lines in other Cargo.tomls. Requires cargo-hakari 0.9.8 or above.
|
||||
dep-format-version = "3"
|
||||
dep-format-version = "4"
|
||||
|
||||
# Setting workspace.resolver = "2" in the root Cargo.toml is HIGHLY recommended.
|
||||
# Hakari works much better with the new feature resolver.
|
||||
|
||||
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
2
.github/ansible/prod.eu-central-1.hosts.yaml
vendored
2
.github/ansible/prod.eu-central-1.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
50
.github/ansible/prod.us-east-1.hosts.yaml
vendored
Normal file
50
.github/ansible/prod.us-east-1.hosts.yaml
vendored
Normal file
@@ -0,0 +1,50 @@
|
||||
storage:
|
||||
vars:
|
||||
bucket_name: neon-prod-storage-us-east-1
|
||||
bucket_region: us-east-1
|
||||
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
|
||||
broker_endpoint: http://storage-broker-lb.theta.us-east-1.internal.aws.neon.tech:50051
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
|
||||
metric_collection_interval: 10min
|
||||
disk_usage_based_eviction:
|
||||
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
|
||||
min_avail_bytes: 0
|
||||
period: "10s"
|
||||
tenant_config:
|
||||
eviction_policy:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
prefix_in_bucket: "pageserver/v1"
|
||||
safekeeper_s3_prefix: safekeeper/v1/wal
|
||||
hostname_suffix: ""
|
||||
remote_user: ssm-user
|
||||
ansible_aws_ssm_region: us-east-1
|
||||
ansible_aws_ssm_bucket_name: neon-prod-storage-us-east-1
|
||||
console_region_id: aws-us-east-1
|
||||
sentry_environment: production
|
||||
|
||||
children:
|
||||
pageservers:
|
||||
hosts:
|
||||
pageserver-0.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-085222088b0d2e0c7
|
||||
pageserver-1.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-0969d4f684d23a21e
|
||||
pageserver-2.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-05dee87895da58dad
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
safekeeper-0.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-04ce739e88793d864
|
||||
safekeeper-1.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-0e9e6c9227fb81410
|
||||
safekeeper-2.us-east-1.aws.neon.tech:
|
||||
ansible_host: i-072f4dd86a327d52f
|
||||
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
8
.github/ansible/prod.us-west-2.hosts.yaml
vendored
8
.github/ansible/prod.us-west-2.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
@@ -34,7 +34,7 @@ storage:
|
||||
pageservers:
|
||||
hosts:
|
||||
pageserver-0.us-west-2.aws.neon.tech:
|
||||
ansible_host: i-0d9f6dfae0e1c780d
|
||||
ansible_host: i-0d9f6dfae0e1c780d
|
||||
pageserver-1.us-west-2.aws.neon.tech:
|
||||
ansible_host: i-0c834be1dddba8b3f
|
||||
pageserver-2.us-west-2.aws.neon.tech:
|
||||
@@ -49,5 +49,5 @@ storage:
|
||||
safekeeper-1.us-west-2.aws.neon.tech:
|
||||
ansible_host: i-074682f9d3c712e7c
|
||||
safekeeper-2.us-west-2.aws.neon.tech:
|
||||
ansible_host: i-042b7efb1729d7966
|
||||
|
||||
ansible_host: i-042b7efb1729d7966
|
||||
|
||||
|
||||
47
.github/ansible/staging.eu-central-1.hosts.yaml
vendored
Normal file
47
.github/ansible/staging.eu-central-1.hosts.yaml
vendored
Normal file
@@ -0,0 +1,47 @@
|
||||
storage:
|
||||
vars:
|
||||
bucket_name: neon-dev-storage-eu-central-1
|
||||
bucket_region: eu-central-1
|
||||
# We only register/update storage in one preview console and manually copy to other instances
|
||||
console_mgmt_base_url: http://neon-internal-api.helium.aws.neon.build
|
||||
broker_endpoint: http://storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build:50051
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
metric_collection_endpoint: http://neon-internal-api.helium.aws.neon.build/billing/api/v1/usage_events
|
||||
metric_collection_interval: 10min
|
||||
disk_usage_based_eviction:
|
||||
max_usage_pct: 80
|
||||
min_avail_bytes: 0
|
||||
period: "10s"
|
||||
tenant_config:
|
||||
eviction_policy:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "20m"
|
||||
threshold: &default_eviction_threshold "20m"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
prefix_in_bucket: "pageserver/v1"
|
||||
safekeeper_s3_prefix: safekeeper/v1/wal
|
||||
hostname_suffix: ""
|
||||
remote_user: ssm-user
|
||||
ansible_aws_ssm_region: eu-central-1
|
||||
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-central-1
|
||||
console_region_id: aws-eu-central-1
|
||||
sentry_environment: staging
|
||||
|
||||
children:
|
||||
pageservers:
|
||||
hosts:
|
||||
pageserver-0.eu-central-1.aws.neon.build:
|
||||
ansible_host: i-011f93ec26cfba2d4
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
safekeeper-0.eu-central-1.aws.neon.build:
|
||||
ansible_host: i-0ff026d27babf8ddd
|
||||
safekeeper-1.eu-central-1.aws.neon.build:
|
||||
ansible_host: i-03983a49ee54725d9
|
||||
safekeeper-2.eu-central-1.aws.neon.build:
|
||||
ansible_host: i-0bd025ecdb61b0db3
|
||||
2
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
2
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "20m"
|
||||
threshold: &default_eviction_threshold "20m"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
2
.github/ansible/staging.us-east-2.hosts.yaml
vendored
2
.github/ansible/staging.us-east-2.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "20m"
|
||||
threshold: &default_eviction_threshold "20m"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
52
.github/helm-values/dev-eu-central-1-alpha.neon-storage-broker.yaml
vendored
Normal file
52
.github/helm-values/dev-eu-central-1-alpha.neon-storage-broker.yaml
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
# Helm chart values for neon-storage-broker
|
||||
podLabels:
|
||||
neon_env: staging
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.alpha.eu-central-1.internal.aws.neon.build
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-storage-broker.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-storage-broker
|
||||
app.kubernetes.io/instance: neon-storage-broker
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-storage-broker"
|
||||
endpoints:
|
||||
- port: broker
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
|
||||
settings:
|
||||
sentryEnvironment: "staging"
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
image:
|
||||
@@ -23,6 +23,7 @@ settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
|
||||
domain: "*.eu-west-1.aws.neon.build"
|
||||
otelExporterOtlpEndpoint: "https://otel-collector.zeta.eu-west-1.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
|
||||
|
||||
@@ -9,6 +9,7 @@ settings:
|
||||
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
|
||||
uri: "https://console.stage.neon.tech/psql_session/"
|
||||
domain: "pg.neon.build"
|
||||
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "1min"
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
@@ -8,6 +24,7 @@ settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
|
||||
domain: "*.cloud.stage.neon.tech"
|
||||
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
|
||||
|
||||
@@ -7,15 +7,16 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
@@ -24,6 +25,7 @@ settings:
|
||||
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
|
||||
domain: "*.us-east-2.aws.neon.build"
|
||||
extraDomains: ["*.us-east-2.postgres.zenith.tech", "*.us-east-2.retooldb-staging.com"]
|
||||
otelExporterOtlpEndpoint: "https://otel-collector.beta.us-east-2.internal.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
|
||||
|
||||
67
.github/helm-values/preview-template.neon-proxy-scram.yaml
vendored
Normal file
67
.github/helm-values/preview-template.neon-proxy-scram.yaml
vendored
Normal file
@@ -0,0 +1,67 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/management/api/v2"
|
||||
domain: "*.cloud.${PREVIEW_NAME}.aws.neon.build"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.${PREVIEW_NAME}.aws.neon.build/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "1min"
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
neon_service: proxy-scram
|
||||
neon_env: test
|
||||
neon_region: ${PREVIEW_NAME}.eu-central-1
|
||||
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: cloud.${PREVIEW_NAME}.aws.neon.build
|
||||
httpsPort: 443
|
||||
|
||||
#metrics:
|
||||
# enabled: true
|
||||
# serviceMonitor:
|
||||
# enabled: true
|
||||
# selector:
|
||||
# release: kube-prometheus-stack
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-proxy
|
||||
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-proxy"
|
||||
endpoints:
|
||||
- port: http
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
69
.github/helm-values/prod-us-east-1-theta.neon-proxy-scram.yaml
vendored
Normal file
69
.github/helm-values/prod-us-east-1-theta.neon-proxy-scram.yaml
vendored
Normal file
@@ -0,0 +1,69 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
|
||||
domain: "*.us-east-1.aws.neon.tech"
|
||||
# These domains haven't been delegated yet.
|
||||
# extraDomains: ["*.us-east-1.retooldb.com", "*.us-east-1.postgres.vercel-storage.com"]
|
||||
sentryEnvironment: "production"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "10min"
|
||||
|
||||
podLabels:
|
||||
neon_service: proxy-scram
|
||||
neon_env: prod
|
||||
neon_region: us-east-1
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: us-east-1.aws.neon.tech
|
||||
httpsPort: 443
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-proxy
|
||||
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-proxy"
|
||||
endpoints:
|
||||
- port: http
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
52
.github/helm-values/prod-us-east-1-theta.neon-storage-broker.yaml
vendored
Normal file
52
.github/helm-values/prod-us-east-1-theta.neon-storage-broker.yaml
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
# Helm chart values for neon-storage-broker
|
||||
podLabels:
|
||||
neon_env: production
|
||||
neon_service: storage-broker
|
||||
|
||||
# Use L4 LB
|
||||
service:
|
||||
# service.annotations -- Annotations to add to the service
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
|
||||
# assign service to this name at external-dns
|
||||
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.theta.us-east-1.internal.aws.neon.tech
|
||||
# service.type -- Service type
|
||||
type: LoadBalancer
|
||||
# service.port -- broker listen port
|
||||
port: 50051
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
metrics:
|
||||
enabled: false
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-storage-broker.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-storage-broker
|
||||
app.kubernetes.io/instance: neon-storage-broker
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-storage-broker"
|
||||
endpoints:
|
||||
- port: broker
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
|
||||
settings:
|
||||
sentryEnvironment: "production"
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
216
.github/workflows/build_and_test.yml
vendored
216
.github/workflows/build_and_test.yml
vendored
@@ -541,7 +541,7 @@ jobs:
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||
options: --init
|
||||
needs: [ push-docker-hub, tag ]
|
||||
needs: [ promote-images, tag ]
|
||||
steps:
|
||||
- name: Set PR's status to pending and request a remote CI test
|
||||
run: |
|
||||
@@ -584,8 +584,7 @@ jobs:
|
||||
neon-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
needs: [ tag ]
|
||||
# https://github.com/GoogleContainerTools/kaniko/issues/2005
|
||||
container: gcr.io/kaniko-project/executor:v1.7.0-debug
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
defaults:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
@@ -597,11 +596,32 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure ECR login
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- name: Kaniko build neon
|
||||
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
@@ -652,7 +672,7 @@ jobs:
|
||||
compute-tools-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
needs: [ tag ]
|
||||
container: gcr.io/kaniko-project/executor:v1.7.0-debug
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
defaults:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
@@ -661,18 +681,41 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1 # v3 won't work with kaniko
|
||||
|
||||
- name: Configure ECR login
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- name: Kaniko build compute tools
|
||||
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--dockerfile Dockerfile.compute-tools
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
compute-node-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container: gcr.io/kaniko-project/executor:v1.7.0-debug
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
needs: [ tag ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
@@ -689,12 +732,36 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure ECR login
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
- name: Configure ECR and Docker Hub login
|
||||
run: |
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
cat <<-EOF > /kaniko/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- name: Kaniko build compute node with extensions
|
||||
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --build-arg PG_VERSION=${{ matrix.version }} --dockerfile Dockerfile.compute-node --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
run:
|
||||
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--build-arg PG_VERSION=${{ matrix.version }}
|
||||
--dockerfile Dockerfile.compute-node
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
@@ -786,26 +853,45 @@ jobs:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
needs: [ tag, test-images, vm-compute-node-image ]
|
||||
container: golang:1.19-bullseye
|
||||
if: github.event_name != 'workflow_dispatch'
|
||||
# Don't add if-condition here.
|
||||
# The job should always be run because we have dependant other jobs that shouldn't be skipped
|
||||
|
||||
steps:
|
||||
- name: Install Crane & ECR helper
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
|
||||
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
|
||||
|
||||
- name: Configure ECR login
|
||||
- name: Configure ECR and Docker Hub login
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
DOCKERHUB_AUTH=$(echo -n "${{ secrets.NEON_DOCKERHUB_USERNAME }}:${{ secrets.NEON_DOCKERHUB_PASSWORD }}" | base64)
|
||||
echo "::add-mask::${DOCKERHUB_AUTH}"
|
||||
|
||||
mkdir /github/home/.docker/
|
||||
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
|
||||
cat <<-EOF > /github/home/.docker/config.json
|
||||
{
|
||||
"auths": {
|
||||
"https://index.docker.io/v1/": {
|
||||
"auth": "${DOCKERHUB_AUTH}"
|
||||
}
|
||||
},
|
||||
"credHelpers": {
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login",
|
||||
"093970136003.dkr.ecr.eu-central-1.amazonaws.com": "ecr-login"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
- name: Add latest tag to images
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
@@ -814,50 +900,17 @@ jobs:
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
push-docker-hub:
|
||||
runs-on: [ self-hosted, dev, x64 ]
|
||||
needs: [ promote-images, tag ]
|
||||
container: golang:1.19-bullseye
|
||||
|
||||
steps:
|
||||
- name: Install Crane & ECR helper
|
||||
run: |
|
||||
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
|
||||
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
|
||||
|
||||
- name: Configure ECR login
|
||||
run: |
|
||||
mkdir /github/home/.docker/
|
||||
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
|
||||
|
||||
- name: Pull neon image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} neon
|
||||
|
||||
- name: Pull compute tools image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} compute-tools
|
||||
|
||||
- name: Pull compute node v14 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} compute-node-v14
|
||||
|
||||
- name: Pull vm compute node v14 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} vm-compute-node-v14
|
||||
|
||||
- name: Pull compute node v15 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} compute-node-v15
|
||||
|
||||
- name: Pull vm compute node v15 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} vm-compute-node-v15
|
||||
|
||||
- name: Pull rust image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
|
||||
crane tag neondatabase/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
- name: Push images to production ECR
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/neon:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
|
||||
@@ -866,45 +919,6 @@ jobs:
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:latest
|
||||
|
||||
- name: Configure Docker Hub login
|
||||
run: |
|
||||
# ECR Credential Helper & Docker Hub don't work together in config, hence reset
|
||||
echo "" > /github/home/.docker/config.json
|
||||
crane auth login -u ${{ secrets.NEON_DOCKERHUB_USERNAME }} -p ${{ secrets.NEON_DOCKERHUB_PASSWORD }} index.docker.io
|
||||
|
||||
- name: Push neon image to Docker Hub
|
||||
run: crane push neon neondatabase/neon:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push compute tools image to Docker Hub
|
||||
run: crane push compute-tools neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push compute node v14 image to Docker Hub
|
||||
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push vm compute node v14 image to Docker Hub
|
||||
run: crane push vm-compute-node-v14 neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push compute node v15 image to Docker Hub
|
||||
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push vm compute node v15 image to Docker Hub
|
||||
run: crane push vm-compute-node-v15 neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push rust image to Docker Hub
|
||||
run: crane push rust neondatabase/rust:pinned
|
||||
|
||||
- name: Add latest tag to images in Docker Hub
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane tag neondatabase/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
- name: Cleanup ECR folder
|
||||
run: rm -rf ~/.ecr
|
||||
|
||||
@@ -913,7 +927,7 @@ jobs:
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
|
||||
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
|
||||
needs: [ push-docker-hub, tag, regress-tests ]
|
||||
needs: [ promote-images, tag, regress-tests ]
|
||||
if: |
|
||||
contains(github.event.pull_request.labels.*.name, 'deploy-test-storage') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
@@ -947,7 +961,7 @@ jobs:
|
||||
deploy:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
needs: [ push-docker-hub, tag, regress-tests ]
|
||||
needs: [ promote-images, tag, regress-tests ]
|
||||
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
|
||||
steps:
|
||||
- name: Fix git ownership
|
||||
@@ -984,7 +998,7 @@ jobs:
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
needs: [ push-docker-hub, tag, regress-tests ]
|
||||
needs: [ promote-images, tag, regress-tests ]
|
||||
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
|
||||
steps:
|
||||
- name: Promote compatibility snapshot for the release
|
||||
|
||||
52
.github/workflows/deploy-dev.yml
vendored
52
.github/workflows/deploy-dev.yml
vendored
@@ -48,7 +48,8 @@ jobs:
|
||||
shell: bash
|
||||
strategy:
|
||||
matrix:
|
||||
target_region: [ eu-west-1, us-east-2 ]
|
||||
# TODO(sergey): Fix storage deploy in eu-central-1
|
||||
target_region: [ eu-west-1, us-east-2]
|
||||
environment:
|
||||
name: dev-${{ matrix.target_region }}
|
||||
steps:
|
||||
@@ -133,6 +134,53 @@ jobs:
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-preview-proxy-new:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
if: inputs.deployProxy
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- target_region: eu-central-1
|
||||
target_cluster: dev-eu-central-1-alpha
|
||||
environment:
|
||||
name: dev-${{ matrix.target_region }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
ref: ${{ inputs.branch }}
|
||||
|
||||
- name: Configure AWS Credentials
|
||||
uses: aws-actions/configure-aws-credentials@v1-node16
|
||||
with:
|
||||
role-to-assume: arn:aws:iam::369495373322:role/github-runner
|
||||
aws-region: eu-central-1
|
||||
role-skip-session-tagging: true
|
||||
role-duration-seconds: 1800
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }}
|
||||
|
||||
- name: Re-deploy preview proxies
|
||||
run: |
|
||||
DOCKER_TAG=${{ inputs.dockerTag }}
|
||||
for PREVIEW_NAME in helium argon krypton xenon radon oganesson hydrogen nitrogen oxygen fluorine chlorine; do
|
||||
export PREVIEW_NAME
|
||||
envsubst <.github/helm-values/preview-template.neon-proxy-scram.yaml >preview-${PREVIEW_NAME}.neon-proxy-scram.yaml
|
||||
helm upgrade neon-proxy-scram-${PREVIEW_NAME} neondatabase/neon-proxy --namespace neon-proxy-${PREVIEW_NAME} --create-namespace --install --atomic -f preview-${PREVIEW_NAME}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
|
||||
done
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-storage-broker-new:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
@@ -148,6 +196,8 @@ jobs:
|
||||
target_cluster: dev-us-east-2-beta
|
||||
- target_region: eu-west-1
|
||||
target_cluster: dev-eu-west-1-zeta
|
||||
- target_region: eu-central-1
|
||||
target_cluster: dev-central-1-alpha
|
||||
environment:
|
||||
name: dev-${{ matrix.target_region }}
|
||||
steps:
|
||||
|
||||
8
.github/workflows/deploy-prod.yml
vendored
8
.github/workflows/deploy-prod.yml
vendored
@@ -49,7 +49,7 @@ jobs:
|
||||
shell: bash
|
||||
strategy:
|
||||
matrix:
|
||||
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1 ]
|
||||
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1, us-east-1 ]
|
||||
environment:
|
||||
name: prod-${{ matrix.target_region }}
|
||||
steps:
|
||||
@@ -97,6 +97,10 @@ jobs:
|
||||
target_cluster: prod-ap-southeast-1-epsilon
|
||||
deploy_link_proxy: false
|
||||
deploy_legacy_scram_proxy: false
|
||||
- target_region: us-east-1
|
||||
target_cluster: prod-us-east-1-theta
|
||||
deploy_link_proxy: false
|
||||
deploy_legacy_scram_proxy: false
|
||||
environment:
|
||||
name: prod-${{ matrix.target_region }}
|
||||
steps:
|
||||
@@ -147,6 +151,8 @@ jobs:
|
||||
target_cluster: prod-eu-central-1-gamma
|
||||
- target_region: ap-southeast-1
|
||||
target_cluster: prod-ap-southeast-1-epsilon
|
||||
- target_region: us-east-1
|
||||
target_cluster: prod-us-east-1-theta
|
||||
environment:
|
||||
name: prod-${{ matrix.target_region }}
|
||||
steps:
|
||||
|
||||
1529
Cargo.lock
generated
1529
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
28
Cargo.toml
28
Cargo.toml
@@ -24,10 +24,10 @@ atty = "0.2.14"
|
||||
aws-config = { version = "0.51.0", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "0.21.0"
|
||||
aws-smithy-http = "0.51.0"
|
||||
aws-types = "0.51.0"
|
||||
aws-types = "0.55"
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
bindgen = "0.61"
|
||||
bindgen = "0.65"
|
||||
bstr = "1.0"
|
||||
byteorder = "1.4"
|
||||
bytes = "1.0"
|
||||
@@ -50,7 +50,7 @@ git-version = "0.3"
|
||||
hashbrown = "0.13"
|
||||
hashlink = "0.8.1"
|
||||
hex = "0.4"
|
||||
hex-literal = "0.3"
|
||||
hex-literal = "0.4"
|
||||
hmac = "0.12.1"
|
||||
hostname = "0.3.1"
|
||||
humantime = "2.1"
|
||||
@@ -62,6 +62,7 @@ jsonwebtoken = "8"
|
||||
libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
nix = "0.26"
|
||||
notify = "5.0.0"
|
||||
num_cpus = "1.15"
|
||||
@@ -74,24 +75,25 @@ parking_lot = "0.12"
|
||||
pin-project-lite = "0.2"
|
||||
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.11"
|
||||
postgres-native-tls = "0.5"
|
||||
rand = "0.8"
|
||||
regex = "1.4"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
|
||||
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
|
||||
reqwest-middleware = "0.2.0"
|
||||
routerify = "3"
|
||||
rpds = "0.12.0"
|
||||
rpds = "0.13"
|
||||
rustls = "0.20"
|
||||
rustls-pemfile = "1"
|
||||
rustls-split = "0.3"
|
||||
scopeguard = "1.1"
|
||||
sentry = { version = "0.29", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
|
||||
sentry = { version = "0.30", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
serde_with = "2.0"
|
||||
sha2 = "0.10.2"
|
||||
signal-hook = "0.3"
|
||||
socket2 = "0.4.4"
|
||||
socket2 = "0.5"
|
||||
strum = "0.24"
|
||||
strum_macros = "0.24"
|
||||
svg_fmt = "0.4.1"
|
||||
@@ -106,17 +108,17 @@ tokio-postgres-rustls = "0.9.0"
|
||||
tokio-rustls = "0.23"
|
||||
tokio-stream = "0.1"
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
toml = "0.5"
|
||||
toml_edit = { version = "0.17", features = ["easy"] }
|
||||
tonic = {version = "0.8", features = ["tls", "tls-roots"]}
|
||||
toml = "0.7"
|
||||
toml_edit = "0.19"
|
||||
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
|
||||
tracing = "0.1"
|
||||
tracing-opentelemetry = "0.18.0"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
url = "2.2"
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
webpki-roots = "0.22.5"
|
||||
x509-parser = "0.14"
|
||||
webpki-roots = "0.23"
|
||||
x509-parser = "0.15"
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
@@ -154,9 +156,9 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
## Build dependencies
|
||||
criterion = "0.4"
|
||||
rcgen = "0.10"
|
||||
rstest = "0.16"
|
||||
rstest = "0.17"
|
||||
tempfile = "3.4"
|
||||
tonic-build = "0.8"
|
||||
tonic-build = "0.9"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
|
||||
@@ -12,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
|
||||
RUN apt update && \
|
||||
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
|
||||
libicu-dev libxslt1-dev
|
||||
libicu-dev libxslt1-dev liblz4-dev libzstd-dev
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -24,8 +24,13 @@ FROM build-deps AS pg-build
|
||||
ARG PG_VERSION
|
||||
COPY vendor/postgres-${PG_VERSION} postgres
|
||||
RUN cd postgres && \
|
||||
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
|
||||
--with-libxml --with-libxslt && \
|
||||
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
|
||||
--with-icu --with-libxml --with-libxslt --with-lz4" && \
|
||||
if [ "${PG_VERSION}" != "v14" ]; then \
|
||||
# zstd is available only from PG15
|
||||
export CONFIGURE_CMD="${CONFIGURE_CMD} --with-zstd"; \
|
||||
fi && \
|
||||
eval $CONFIGURE_CMD && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
|
||||
# Install headers
|
||||
@@ -565,13 +570,17 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
|
||||
# Install:
|
||||
# libreadline8 for psql
|
||||
# libicu67, locales for collations (including ICU and plpgsql_check)
|
||||
# liblz4-1 for lz4
|
||||
# libossp-uuid16 for extension ossp-uuid
|
||||
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
|
||||
# libxml2, libxslt1.1 for xml2
|
||||
# libzstd1 for zstd
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends -y \
|
||||
gdb \
|
||||
locales \
|
||||
libicu67 \
|
||||
liblz4-1 \
|
||||
libreadline8 \
|
||||
libossp-uuid16 \
|
||||
libgeos-c1v5 \
|
||||
@@ -581,7 +590,8 @@ RUN apt update && \
|
||||
libsfcgal1 \
|
||||
libxml2 \
|
||||
libxslt1.1 \
|
||||
gdb && \
|
||||
libzstd1 \
|
||||
procps && \
|
||||
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ RUN set -e \
|
||||
|
||||
RUN set -e \
|
||||
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
|
||||
&& CONNSTR="dbname=neondb user=cloud_admin sslmode=disable" \
|
||||
&& CONNSTR="dbname=postgres user=cloud_admin sslmode=disable" \
|
||||
&& ARGS="--auto-restart --cgroup=neon-postgres --pgconnstr=\"$CONNSTR\"" \
|
||||
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant $ARGS'" >> /etc/inittab
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ fn main() -> Result<()> {
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
let mut spec = None;
|
||||
let spec;
|
||||
let mut live_config_allowed = false;
|
||||
match spec_json {
|
||||
// First, try to get cluster spec from the cli argument
|
||||
@@ -89,9 +89,13 @@ fn main() -> Result<()> {
|
||||
} else if let Some(id) = compute_id {
|
||||
if let Some(cp_base) = control_plane_uri {
|
||||
live_config_allowed = true;
|
||||
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
|
||||
spec = Some(s);
|
||||
}
|
||||
spec = match get_spec_from_control_plane(cp_base, id) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!("cannot get response from control plane: {}", e);
|
||||
panic!("neither spec nor confirmation that compute is in the Empty state was received");
|
||||
}
|
||||
};
|
||||
} else {
|
||||
panic!("must specify both --control-plane-uri and --compute-id or none");
|
||||
}
|
||||
@@ -114,7 +118,6 @@ fn main() -> Result<()> {
|
||||
spec_set = false;
|
||||
}
|
||||
let compute_node = ComputeNode {
|
||||
start_time: Utc::now(),
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
@@ -147,6 +150,17 @@ fn main() -> Result<()> {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
let pspec = state.pspec.as_ref().expect("spec must be set");
|
||||
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
|
||||
|
||||
// Record for how long we slept waiting for the spec.
|
||||
state.metrics.wait_for_spec_ms = Utc::now()
|
||||
.signed_duration_since(state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
// Reset start time to the actual start of the configuration, so that
|
||||
// total startup time was properly measured at the end.
|
||||
state.start_time = Utc::now();
|
||||
|
||||
state.status = ComputeStatus::Init;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
|
||||
@@ -1,12 +1,28 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use postgres::Client;
|
||||
use tokio_postgres::NoTls;
|
||||
use tracing::{error, instrument};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
/// Update timestamp in a row in a special service table to check
|
||||
/// that we can actually write some data in this particular timeline.
|
||||
/// Create table if it's missing.
|
||||
#[instrument(skip_all)]
|
||||
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
||||
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||
// Connect to the database.
|
||||
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
|
||||
if client.is_closed() {
|
||||
return Err(anyhow!("connection to postgres closed"));
|
||||
}
|
||||
|
||||
// The connection object performs the actual communication with the database,
|
||||
// so spawn it off to run on its own.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let query = "
|
||||
CREATE TABLE IF NOT EXISTS health_check (
|
||||
id serial primary key,
|
||||
@@ -15,31 +31,15 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
||||
INSERT INTO health_check VALUES (1, now())
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET updated_at = now();";
|
||||
let result = client.simple_query(query)?;
|
||||
if result.len() < 2 {
|
||||
return Err(anyhow::format_err!("executed {} queries", result.len()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
|
||||
if client.is_closed() {
|
||||
return Err(anyhow!("connection to postgres closed"));
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let result = client
|
||||
.simple_query("UPDATE health_check SET updated_at = now() WHERE id = 1;")
|
||||
.await?;
|
||||
|
||||
if result.len() != 1 {
|
||||
return Err(anyhow!("statement can't be executed"));
|
||||
|
||||
let result = client.simple_query(query).await?;
|
||||
|
||||
if result.len() != 2 {
|
||||
return Err(anyhow::format_err!(
|
||||
"expected 2 query results, but got {}",
|
||||
result.len()
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -32,14 +32,12 @@ use utils::lsn::Lsn;
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::ComputeSpec;
|
||||
|
||||
use crate::checker::create_writability_check_data;
|
||||
use crate::config;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
pub start_time: DateTime<Utc>,
|
||||
// Url type maintains proper escaping
|
||||
pub connstr: url::Url,
|
||||
pub pgdata: String,
|
||||
@@ -67,6 +65,7 @@ pub struct ComputeNode {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ComputeState {
|
||||
pub start_time: DateTime<Utc>,
|
||||
pub status: ComputeStatus,
|
||||
/// Timestamp of the last Postgres activity
|
||||
pub last_active: DateTime<Utc>,
|
||||
@@ -78,6 +77,7 @@ pub struct ComputeState {
|
||||
impl ComputeState {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
start_time: Utc::now(),
|
||||
status: ComputeStatus::Empty,
|
||||
last_active: Utc::now(),
|
||||
error: None,
|
||||
@@ -342,7 +342,6 @@ impl ComputeNode {
|
||||
handle_databases(spec, &mut client)?;
|
||||
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(spec, self.connstr.as_str(), &mut client)?;
|
||||
create_writability_check_data(&mut client)?;
|
||||
handle_extensions(spec, &mut client)?;
|
||||
|
||||
// 'Close' connection
|
||||
@@ -427,7 +426,7 @@ impl ComputeNode {
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
state.metrics.total_startup_ms = startup_end_time
|
||||
.signed_duration_since(self.start_time)
|
||||
.signed_duration_since(compute_state.start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
@@ -18,6 +18,7 @@ use tracing_utils::http::OtelName;
|
||||
|
||||
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
|
||||
ComputeStatusResponse {
|
||||
start_time: state.start_time,
|
||||
tenant: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
@@ -85,7 +86,10 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
let res = crate::checker::check_writability(compute).await;
|
||||
match res {
|
||||
Ok(_) => Response::new(Body::from("true")),
|
||||
Err(e) => Response::new(Body::from(e.to_string())),
|
||||
Err(e) => {
|
||||
error!("check_writability failed: {}", e);
|
||||
Response::new(Body::from(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -152,11 +152,14 @@ components:
|
||||
type: object
|
||||
description: Compute startup metrics.
|
||||
required:
|
||||
- wait_for_spec_ms
|
||||
- sync_safekeepers_ms
|
||||
- basebackup_ms
|
||||
- config_ms
|
||||
- total_startup_ms
|
||||
properties:
|
||||
wait_for_spec_ms:
|
||||
type: integer
|
||||
sync_safekeepers_ms:
|
||||
type: integer
|
||||
basebackup_ms:
|
||||
@@ -181,6 +184,13 @@ components:
|
||||
- status
|
||||
- last_active
|
||||
properties:
|
||||
start_time:
|
||||
type: string
|
||||
description: |
|
||||
Time when compute was started. If initially compute was started in the `empty`
|
||||
state and then provided with valid spec, `start_time` will be reset to the
|
||||
moment, when spec was received.
|
||||
example: "2022-10-12T07:20:50.52Z"
|
||||
status:
|
||||
$ref: '#/components/schemas/ComputeStatus'
|
||||
last_active:
|
||||
|
||||
@@ -4,42 +4,117 @@ use std::str::FromStr;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use postgres::config::Config;
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
|
||||
use reqwest::StatusCode;
|
||||
use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
|
||||
|
||||
use crate::config;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
use crate::pg_helpers::*;
|
||||
|
||||
use compute_api::responses::ControlPlaneSpecResponse;
|
||||
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
|
||||
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
|
||||
|
||||
// Do control plane request and return response if any. In case of error it
|
||||
// returns a bool flag indicating whether it makes sense to retry the request
|
||||
// and a string with error message.
|
||||
fn do_control_plane_request(
|
||||
uri: &str,
|
||||
jwt: &str,
|
||||
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
|
||||
let resp = reqwest::blocking::Client::new()
|
||||
.get(uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()
|
||||
.map_err(|e| {
|
||||
(
|
||||
true,
|
||||
format!("could not perform spec request to control plane: {}", e),
|
||||
)
|
||||
})?;
|
||||
|
||||
match resp.status() {
|
||||
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
|
||||
Ok(spec_resp) => Ok(spec_resp),
|
||||
Err(e) => Err((
|
||||
true,
|
||||
format!("could not deserialize control plane response: {}", e),
|
||||
)),
|
||||
},
|
||||
StatusCode::SERVICE_UNAVAILABLE => {
|
||||
Err((true, "control plane is temporarily unavailable".to_string()))
|
||||
}
|
||||
StatusCode::BAD_GATEWAY => {
|
||||
// We have a problem with intermittent 502 errors now
|
||||
// https://github.com/neondatabase/cloud/issues/2353
|
||||
// It's fine to retry GET request in this case.
|
||||
Err((true, "control plane request failed with 502".to_string()))
|
||||
}
|
||||
// Another code, likely 500 or 404, means that compute is unknown to the control plane
|
||||
// or some internal failure happened. Doesn't make much sense to retry in this case.
|
||||
_ => Err((
|
||||
false,
|
||||
format!(
|
||||
"unexpected control plane response status code: {}",
|
||||
resp.status()
|
||||
),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
|
||||
/// env variable is set, it will be used for authorization.
|
||||
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
|
||||
pub fn get_spec_from_control_plane(
|
||||
base_uri: &str,
|
||||
compute_id: &str,
|
||||
) -> Result<Option<ComputeSpec>> {
|
||||
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
|
||||
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
|
||||
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
let mut attempt = 1;
|
||||
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
|
||||
|
||||
info!("getting spec from control plane: {}", cp_uri);
|
||||
|
||||
// TODO: check the response. We should distinguish cases when it's
|
||||
// - network error, then retry
|
||||
// - no spec for compute yet, then wait
|
||||
// - compute id is unknown or any other error, then bail out
|
||||
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
|
||||
.get(cp_uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()
|
||||
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
|
||||
.json()
|
||||
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
|
||||
// Do 3 attempts to get spec from the control plane using the following logic:
|
||||
// - network error -> then retry
|
||||
// - compute id is unknown or any other error -> bail out
|
||||
// - no spec for compute yet (Empty state) -> return Ok(None)
|
||||
// - got spec -> return Ok(Some(spec))
|
||||
while attempt < 4 {
|
||||
spec = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
Ok(spec_resp) => match spec_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok(None),
|
||||
ControlPlaneComputeStatus::Attached => {
|
||||
if let Some(spec) = spec_resp.spec {
|
||||
Ok(Some(spec))
|
||||
} else {
|
||||
bail!("compute is attached, but spec is empty")
|
||||
}
|
||||
}
|
||||
},
|
||||
Err((retry, msg)) => {
|
||||
if retry {
|
||||
Err(anyhow!(msg))
|
||||
} else {
|
||||
bail!(msg);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(spec) = resp.spec {
|
||||
Ok(spec)
|
||||
} else {
|
||||
bail!("could not get compute spec from control plane")
|
||||
if let Err(e) = &spec {
|
||||
error!("attempt {} to get spec failed with: {}", attempt, e);
|
||||
} else {
|
||||
return spec;
|
||||
}
|
||||
|
||||
attempt += 1;
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
|
||||
// All attempts failed, return error.
|
||||
spec
|
||||
}
|
||||
|
||||
/// It takes cluster specification and does the following:
|
||||
|
||||
@@ -359,8 +359,8 @@ impl PageServerNode {
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.get("eviction_policy")
|
||||
.map(|x| serde_json::from_str(x))
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
@@ -368,6 +368,9 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -445,6 +448,9 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as an integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.get("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
})
|
||||
.send()?
|
||||
.error_from_body()?;
|
||||
|
||||
@@ -14,6 +14,7 @@ pub struct GenericAPIError {
|
||||
#[derive(Serialize, Debug)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct ComputeStatusResponse {
|
||||
pub start_time: DateTime<Utc>,
|
||||
pub tenant: Option<String>,
|
||||
pub timeline: Option<String>,
|
||||
pub status: ComputeStatus,
|
||||
@@ -63,6 +64,7 @@ where
|
||||
/// Response of the /metrics.json API
|
||||
#[derive(Clone, Debug, Default, Serialize)]
|
||||
pub struct ComputeMetrics {
|
||||
pub wait_for_spec_ms: u64,
|
||||
pub sync_safekeepers_ms: u64,
|
||||
pub basebackup_ms: u64,
|
||||
pub config_ms: u64,
|
||||
@@ -75,4 +77,16 @@ pub struct ComputeMetrics {
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ControlPlaneSpecResponse {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
pub status: ControlPlaneComputeStatus,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ControlPlaneComputeStatus {
|
||||
// Compute is known to control-plane, but it's not
|
||||
// yet attached to any timeline / endpoint.
|
||||
Empty,
|
||||
// Compute is attached to some timeline / endpoint and
|
||||
// should be able to start with provided spec.
|
||||
Attached,
|
||||
}
|
||||
|
||||
@@ -4,13 +4,12 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.68"
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
|
||||
rand = "0.8.3"
|
||||
serde = "1.0.152"
|
||||
serde_with = "2.1.0"
|
||||
utils = { version = "0.1.0", path = "../utils" }
|
||||
workspace_hack = { version = "0.1.0", path = "../../workspace_hack" }
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -135,6 +135,7 @@ pub struct TenantCreateRequest {
|
||||
// For now, this field is not even documented in the openapi_spec.yml.
|
||||
pub eviction_policy: Option<serde_json::Value>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -181,6 +182,7 @@ pub struct TenantConfigRequest {
|
||||
// For now, this field is not even documented in the openapi_spec.yml.
|
||||
pub eviction_policy: Option<serde_json::Value>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
}
|
||||
|
||||
impl TenantConfigRequest {
|
||||
@@ -202,6 +204,7 @@ impl TenantConfigRequest {
|
||||
trace_read_requests: None,
|
||||
eviction_policy: None,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use bindgen::callbacks::ParseCallbacks;
|
||||
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PostgresFfiCallbacks;
|
||||
@@ -20,7 +20,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
|
||||
|
||||
// Add any custom #[derive] attributes to the data structures that bindgen
|
||||
// creates.
|
||||
fn add_derives(&self, name: &str) -> Vec<String> {
|
||||
fn add_derives(&self, derive_info: &DeriveInfo) -> Vec<String> {
|
||||
// This is the list of data structures that we want to serialize/deserialize.
|
||||
let serde_list = [
|
||||
"XLogRecord",
|
||||
@@ -31,7 +31,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
|
||||
"ControlFileData",
|
||||
];
|
||||
|
||||
if serde_list.contains(&name) {
|
||||
if serde_list.contains(&derive_info.name) {
|
||||
vec![
|
||||
"Default".into(), // Default allows us to easily fill the padding fields with 0.
|
||||
"Serialize".into(),
|
||||
|
||||
@@ -204,12 +204,7 @@ async fn upload_s3_data(
|
||||
let data = format!("remote blob data {i}").into_bytes();
|
||||
let data_len = data.len();
|
||||
task_client
|
||||
.upload(
|
||||
Box::new(std::io::Cursor::new(data)),
|
||||
data_len,
|
||||
&blob_path,
|
||||
None,
|
||||
)
|
||||
.upload(std::io::Cursor::new(data), data_len, &blob_path, None)
|
||||
.await?;
|
||||
|
||||
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
[package]
|
||||
name = "timeline_data_path"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
tokio.workspace = true
|
||||
thiserror.workspace = true
|
||||
@@ -1,396 +0,0 @@
|
||||
//! The Timeline's core data path.
|
||||
//!
|
||||
//! # Overview
|
||||
//!
|
||||
//! This crate implements the core data path of a Timeline inside Pageserver:
|
||||
//!
|
||||
//! 1. WAL records from `walreceiver`, via in-memory layers, into persistent L0 layers.
|
||||
//! 1. `GetPage@LSN`: retrieval of WAL records and page images for feeding into WAL redo.
|
||||
//! 1. Data re-shuffeling through compaction (TODO).
|
||||
//! 1. Page image creation & garbage collection through GC (TODO).
|
||||
//!
|
||||
//! This crate assumes the following concepts, but is fully generic over their implementation:
|
||||
//!
|
||||
//! - **Delta Records**: data is written into the system in the form of self-descriptive deltas.
|
||||
//! For the Pageserver use case, these deltas are derived from Postgres WAL records.
|
||||
//! - **Page Numbers**: Delta Records always affect a single key.
|
||||
//! That key is called page number, because, in the Pageserver use case, the Postgres table page numbers are the keys.
|
||||
//! - **LSN**: When writing Delta Records into the system, they are associated with a monotonically increasing LSN.
|
||||
//! Subsequently written Delta Records must have increasing LSNs.
|
||||
//! - **Page Images**: Delta Records for a given page can be used to reconstruct the page. Think of it like squashing diffs.
|
||||
//! - When sorting the Delta Records for a given key by their LSN, any prefix of that sorting can be squashed into a page image.
|
||||
//! - Delta Records following such a squash can be squashed into that page image.
|
||||
//! - In Pageserver, WAL redo implements the (pure) function of squashing.
|
||||
//! - **In-Memory Layer**: an object that represents an "unfinished" L0 layer file, holding Delta Records in insertion order.
|
||||
//! "Unfinished" means that we're still writing Delta Records to that file.
|
||||
//! - **Historic Layer**: an object that represents a "finished" layer file, at any compaction level.
|
||||
//! Such objects reside on disk and/or in remote storage.
|
||||
//! They may contain Delta Records, Page Images, or a mixture thereof. It doesn't matter.
|
||||
//! - **HistoricStuff**: an efficient lookup data structure to find the list of Historic Layer objects
|
||||
//! that hold the Delta Records / PageImages required to reconstruct a Page Image at a given LSN.
|
||||
//!
|
||||
//! # API
|
||||
//!
|
||||
//! The core idea is that of a specialized single-producer multi-consumer structure,
|
||||
//! embodied by a Read-end and a Write-end.
|
||||
//!
|
||||
//! The Write-end is used to push new `DeltaRecord @ LSN`s into the system.
|
||||
//! In Pageserver, this is used by the `WalReceiver`.
|
||||
//!
|
||||
//! The Read-end provides the `GetPage@LSN` API.
|
||||
//! In the current iteration, we actually return something called `ReconstructWork`.
|
||||
//! I.e., we leave the work of reading the values from the layers, and the WAL redo invocation to the caller.
|
||||
//! Find rationale for this design in the *Scope* section.
|
||||
//!
|
||||
//! ## Immutability
|
||||
//!
|
||||
//! The traits defined by this crate assume immutable data structures that are multi-versioned.
|
||||
//!
|
||||
//! As an example for what "immutable" means, take the case where we add a new Historic Layer to HistoricStuff.
|
||||
//! Traditionally, one would use shared mutable state, i.e. `Arc<RwLock<...>>`.
|
||||
//! To insert the new Historic Layer, we would acquire the RwLock in write mode and modify a lookup data structure to accomodate the new layer.
|
||||
//! The Read-ends would use RwLock in read mode to read from the data structure.
|
||||
//!
|
||||
//! Conversely, with *immutable data structures*, writers create new version (aka *snapshots*) of the lookup data structure.
|
||||
//! New reads on the Read-ends will use the new snapshot, but old ongoing reads would use the old version(s).
|
||||
//! An efficient implementation would likely share the Historic Layer objects, e.g., using `Arc`.
|
||||
//! And maybe there's internally mutable state inside the layer objects, e.g., to track residence (i.e., *on-demand downloaded* vs *evicted*).
|
||||
//! But the important point is that there's no synchronization / lock-holding at any higher level, except when grabbing a reference to the snapshot (Read-end), or when publishing a new snapshot (Write-end).
|
||||
//!
|
||||
//! ## Scope
|
||||
//!
|
||||
//! The following concerns are considered implementation details from the perspective of this crate:
|
||||
//!
|
||||
//! - **Layer File Persistence**: `HistoricStuff::make_historic` is responsible for this.
|
||||
//! - **Reading Layer Files**: the `ReconstructWork` that the Read-end returns from `GetPage@LSN` requests contains the list of layers to consult.
|
||||
//! The crate consumer is responsible for reading the layers & doing WAL redo.
|
||||
//! Likely the implementation of `HistoricStuff` plays a role here, because it is responsible for persisting the layer files.
|
||||
//! - **Layer Eviction & On-Demand Download**: this is just an aspect of the above.
|
||||
//! The crate consumer can choose to implement eviction & on-demand download however they wish.
|
||||
//! The only requirement is that the Historic Layers don't change their contents, i.e., they always returnt he same reconstruct values for the same lookup.
|
||||
//! - For example, a `LayerCache` modoule or service could take care of layer uploads, eviction, and on-demand downloads.
|
||||
//! Initially, the `layer cache` can be local-only.
|
||||
//! But in the future, it can be multi-machine / clustered pagesevers / aka "sharding".
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! The [`new`] function is the entrypoint to this crate.
|
||||
//!
|
||||
//! See the test cases for how it is used.
|
||||
|
||||
use std::{marker::PhantomData, time::Duration};
|
||||
|
||||
use utils::seqwait::{self, Advance, SeqWait, Wait};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Collection of types / type bounds used by Read-end and Write-end.
|
||||
///
|
||||
/// See the [`crate`]-level docs's *Concepts* section to learn about
|
||||
/// the meaning of each associated `type`.
|
||||
///
|
||||
/// # Usage
|
||||
///
|
||||
/// Define a zero-sized-type and impl this Trait for it.
|
||||
/// Then use that zero-sized-type as the single generic argument to [`new`]
|
||||
/// and almost all types declared in this crate.
|
||||
///
|
||||
/// It might feel a bit weird, but, the alternative is to have umpteen generic
|
||||
/// types per `impl` with repetitive trait bounds.
|
||||
///
|
||||
/// Search the test cases for an example of how this can be used to improve testability.
|
||||
pub trait Types {
|
||||
type Key: Copy;
|
||||
type Lsn: Ord + Copy;
|
||||
type LsnCounter: seqwait::MonotonicCounter<Self::Lsn> + Copy;
|
||||
type DeltaRecord;
|
||||
type HistoricLayer;
|
||||
type InMemoryLayer: InMemoryLayer<Types = Self> + Clone;
|
||||
type HistoricStuff: HistoricStuff<Types = Self> + Clone;
|
||||
type GetReconstructPathError: std::error::Error;
|
||||
}
|
||||
|
||||
/// Error returned by [`InMemoryLayer::put`].
|
||||
#[derive(thiserror::Error)]
|
||||
pub struct InMemoryLayerPutError<DeltaRecord> {
|
||||
delta: DeltaRecord,
|
||||
kind: InMemoryLayerPutErrorKind,
|
||||
}
|
||||
|
||||
/// Part of [`InMemoryLayerPutError`].
|
||||
#[derive(Debug)]
|
||||
pub enum InMemoryLayerPutErrorKind {
|
||||
LayerFull,
|
||||
AlreadyHaveRecordForKeyAndLsn,
|
||||
}
|
||||
|
||||
impl<DeltaRecord> std::fmt::Debug for InMemoryLayerPutError<DeltaRecord> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InMemoryLayerPutError")
|
||||
// would require DeltaRecord to impl Debug
|
||||
// .field("delta", &self.delta)
|
||||
.field("kind", &self.kind)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// An in-memory layer. See [`crate`] docs for details on this concept.
|
||||
pub trait InMemoryLayer: std::fmt::Debug + Default + Clone {
|
||||
type Types: Types;
|
||||
fn put(
|
||||
&mut self,
|
||||
key: <Self::Types as Types>::Key,
|
||||
lsn: <Self::Types as Types>::Lsn,
|
||||
delta: <Self::Types as Types>::DeltaRecord,
|
||||
) -> Result<Self, InMemoryLayerPutError<<Self::Types as Types>::DeltaRecord>>;
|
||||
fn get(
|
||||
&self,
|
||||
key: <Self::Types as Types>::Key,
|
||||
lsn: <Self::Types as Types>::Lsn,
|
||||
) -> Vec<<Self::Types as Types>::DeltaRecord>;
|
||||
}
|
||||
|
||||
/// The manager of [`Types::HistoricLayer`]s.
|
||||
pub trait HistoricStuff {
|
||||
type Types: Types;
|
||||
fn get_reconstruct_path(
|
||||
&self,
|
||||
key: <Self::Types as Types>::Key,
|
||||
lsn: <Self::Types as Types>::Lsn,
|
||||
) -> Result<
|
||||
Vec<<Self::Types as Types>::HistoricLayer>,
|
||||
<Self::Types as Types>::GetReconstructPathError,
|
||||
>;
|
||||
/// Produce a new version of `self` that includes the given inmem layer.
|
||||
fn make_historic(&self, inmem: <Self::Types as Types>::InMemoryLayer) -> Self;
|
||||
}
|
||||
|
||||
/// A snapshot of the data. See [`crate`]-level docs section on *immutability* for details.
|
||||
struct Snapshot<T: Types> {
|
||||
_types: PhantomData<T>,
|
||||
inmem: Option<T::InMemoryLayer>,
|
||||
historic: T::HistoricStuff,
|
||||
}
|
||||
|
||||
impl<T: Types> Clone for Snapshot<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
_types: self._types.clone(),
|
||||
inmem: self.inmem.clone(),
|
||||
historic: self.historic.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The Read-end. See [`crate`]-level docs for details.
|
||||
pub struct Reader<T: Types> {
|
||||
wait: Wait<T::LsnCounter, T::Lsn, Snapshot<T>>,
|
||||
}
|
||||
|
||||
/// The Write-end. See [`crate`]-level docs for details.
|
||||
pub struct Writer<T: Types> {
|
||||
advance: Advance<T::LsnCounter, T::Lsn, Snapshot<T>>,
|
||||
}
|
||||
|
||||
/// Setup a pair of Read-end and Write-End. This is the entrypoint to this crate.
|
||||
///
|
||||
/// The idea is that the caller loads the arguments from persistent state that `HistoricStuff` wrote at an earlier point in time.
|
||||
pub fn new<T: Types>(lsn: T::LsnCounter, historic: T::HistoricStuff) -> (Reader<T>, Writer<T>) {
|
||||
let state = Snapshot {
|
||||
_types: PhantomData::<T>::default(),
|
||||
inmem: None,
|
||||
historic: historic,
|
||||
};
|
||||
let (wait, advance) = SeqWait::new(lsn, state).split_spmc();
|
||||
let reader = Reader { wait };
|
||||
let read_writer = Writer { advance };
|
||||
(reader, read_writer)
|
||||
}
|
||||
|
||||
/// Error returned by the get-page operations.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum GetError<T: Types> {
|
||||
#[error(transparent)]
|
||||
SeqWait(seqwait::SeqWaitError),
|
||||
#[error(transparent)]
|
||||
GetReconstructPath(T::GetReconstructPathError),
|
||||
}
|
||||
|
||||
/// Self-contained set of objects required to reconstruct a page image for the given `key` @ `lsn`.
|
||||
///
|
||||
/// This is returned by the `get` methods of [`Reader`] and [`Writer`].
|
||||
///
|
||||
/// To reconstruct the page image, stack up (top to bottom) `inmem_records` plus all records found for `key` and `lsn` along the `historic_path` until an initial page image is found.
|
||||
/// Then feed that stack to WAL-redo to get the page image.
|
||||
///
|
||||
/// See [`crate`]-level docs on *scope* for why we don't return page images from these functions.
|
||||
pub struct ReconstructWork<T: Types> {
|
||||
pub key: T::Key,
|
||||
pub lsn: T::Lsn,
|
||||
pub inmem_records: Vec<T::DeltaRecord>,
|
||||
pub historic_path: Vec<T::HistoricLayer>,
|
||||
}
|
||||
|
||||
impl<T: Types> Reader<T> {
|
||||
/// This is the `GetPage@LSN` operation.
|
||||
///
|
||||
/// See the [`crate`]-level docs for why we return [`ReconstructWork`] instead of a Page Image here.
|
||||
pub async fn get(&self, key: T::Key, lsn: T::Lsn) -> Result<ReconstructWork<T>, GetError<T>> {
|
||||
// XXX dedup with Writer::get_nowait
|
||||
let state = self.wait.wait_for(lsn).await.map_err(GetError::SeqWait)?;
|
||||
let inmem_records = state
|
||||
.inmem
|
||||
.as_ref()
|
||||
.map(|iml| iml.get(key, lsn))
|
||||
.unwrap_or_default();
|
||||
let historic_path = state
|
||||
.historic
|
||||
.get_reconstruct_path(key, lsn)
|
||||
.map_err(GetError::GetReconstructPath)?;
|
||||
Ok(ReconstructWork {
|
||||
key,
|
||||
lsn,
|
||||
inmem_records,
|
||||
historic_path,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Error returned by the `put` operation.
|
||||
#[derive(thiserror::Error)]
|
||||
pub struct PutError<T: Types> {
|
||||
/// The `delta` record which we failed to `put`.
|
||||
pub delta: T::DeltaRecord,
|
||||
/// Description of what went wrong.
|
||||
pub kind: PutErrorKind,
|
||||
}
|
||||
|
||||
/// Part of [`PutError`].
|
||||
#[derive(Debug)]
|
||||
pub enum PutErrorKind {
|
||||
AlreadyHaveInMemoryRecordForKeyAndLsn,
|
||||
}
|
||||
|
||||
impl<T: Types> std::fmt::Debug for PutError<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PutError")
|
||||
// would need to require Debug for DeltaRecord
|
||||
// .field("delta", &self.delta)
|
||||
.field("kind", &self.kind)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Types> Writer<T> {
|
||||
/// Insert data into the system.
|
||||
pub async fn put(
|
||||
&mut self,
|
||||
key: T::Key,
|
||||
lsn: T::Lsn,
|
||||
delta: T::DeltaRecord,
|
||||
) -> Result<(), PutError<T>> {
|
||||
let (_snapshot_lsn, snapshot) = self.advance.get_current_data();
|
||||
// TODO ensure snapshot_lsn <= lsn?
|
||||
let mut inmem = snapshot
|
||||
.inmem
|
||||
.unwrap_or_else(|| T::InMemoryLayer::default());
|
||||
// XXX: use the Advance as witness and only allow witness to access inmem in write mode
|
||||
match inmem.put(key, lsn, delta) {
|
||||
Ok(new_inmem) => {
|
||||
let new_snapshot = Snapshot {
|
||||
_types: PhantomData,
|
||||
inmem: Some(new_inmem),
|
||||
historic: snapshot.historic,
|
||||
};
|
||||
self.advance.advance(lsn, Some(new_snapshot));
|
||||
}
|
||||
Err(InMemoryLayerPutError {
|
||||
delta,
|
||||
kind: InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
|
||||
}) => {
|
||||
return Err(PutError {
|
||||
delta,
|
||||
kind: PutErrorKind::AlreadyHaveInMemoryRecordForKeyAndLsn,
|
||||
});
|
||||
}
|
||||
Err(InMemoryLayerPutError {
|
||||
delta,
|
||||
kind: InMemoryLayerPutErrorKind::LayerFull,
|
||||
}) => {
|
||||
let new_historic = snapshot.historic.make_historic(inmem);
|
||||
let mut new_inmem = T::InMemoryLayer::default();
|
||||
let new_inmem = new_inmem
|
||||
.put(key, lsn, delta)
|
||||
.expect("put into default inmem layer must not fail");
|
||||
let new_state = Snapshot {
|
||||
_types: PhantomData::<T>::default(),
|
||||
inmem: Some(new_inmem),
|
||||
historic: new_historic,
|
||||
};
|
||||
self.advance.advance(lsn, Some(new_state));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Force flushing of the current in-memory layer.
|
||||
///
|
||||
/// Usually, flushing happens only if the in-memory layer is full.
|
||||
/// Use this API to make it happen in other circumstances (shutdown, periodic ticker, etc.).
|
||||
pub async fn force_flush(&mut self) -> tokio::io::Result<()> {
|
||||
let (snapshot_lsn, snapshot) = self.advance.get_current_data();
|
||||
let Snapshot {
|
||||
_types,
|
||||
inmem,
|
||||
historic,
|
||||
} = snapshot;
|
||||
// XXX: use the Advance as witness and only allow witness to access inmem in "write" mode
|
||||
let Some(inmem) = inmem else {
|
||||
// nothing to do
|
||||
return Ok(());
|
||||
};
|
||||
let new_historic = historic.make_historic(inmem);
|
||||
let new_snapshot = Snapshot {
|
||||
_types: PhantomData::<T>::default(),
|
||||
inmem: None,
|
||||
historic: new_historic,
|
||||
};
|
||||
self.advance.advance(snapshot_lsn, Some(new_snapshot)); // TODO: should fail if we're past snapshot_lsn
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `get` at the given LSN, without blocking.
|
||||
///
|
||||
/// Fails with a timeout error if the `lsn` isn't there yet.
|
||||
/// That makes sense because the only way we'd stop waiting is by a `self.put()`.
|
||||
/// But concurrent `put()` is forbidden.
|
||||
pub async fn get_nowait(
|
||||
&self,
|
||||
key: T::Key,
|
||||
lsn: T::Lsn,
|
||||
) -> Result<ReconstructWork<T>, GetError<T>> {
|
||||
// XXX dedup with Reader::get
|
||||
let state = self
|
||||
.advance
|
||||
.wait_for_timeout(lsn, Duration::from_secs(0))
|
||||
// The await is never going to block because we pass from_secs(0).
|
||||
.await
|
||||
.map_err(GetError::SeqWait)?;
|
||||
let inmem_records = state
|
||||
.inmem
|
||||
.as_ref()
|
||||
.map(|iml| iml.get(key, lsn))
|
||||
.unwrap_or_default();
|
||||
let historic_path = state
|
||||
.historic
|
||||
.get_reconstruct_path(key, lsn)
|
||||
.map_err(GetError::GetReconstructPath)?;
|
||||
Ok(ReconstructWork {
|
||||
key,
|
||||
lsn,
|
||||
inmem_records,
|
||||
historic_path,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,170 +0,0 @@
|
||||
use std::collections::{btree_map::Entry, BTreeMap};
|
||||
use std::sync::Arc;
|
||||
use utils::seqwait;
|
||||
|
||||
/// The ZST for which we impl the `super::Types` type collection trait.
|
||||
struct TestTypes;
|
||||
|
||||
impl super::Types for TestTypes {
|
||||
type Key = usize;
|
||||
|
||||
type Lsn = usize;
|
||||
|
||||
type LsnCounter = UsizeCounter;
|
||||
|
||||
type DeltaRecord = &'static str;
|
||||
|
||||
type HistoricLayer = Arc<TestHistoricLayer>;
|
||||
|
||||
type InMemoryLayer = TestInMemoryLayer;
|
||||
|
||||
type HistoricStuff = TestHistoricStuff;
|
||||
}
|
||||
|
||||
/// For testing, our in-memory layer is a simple hashmap.
|
||||
#[derive(Clone, Default, Debug)]
|
||||
struct TestInMemoryLayer {
|
||||
by_key: BTreeMap<usize, BTreeMap<usize, &'static str>>,
|
||||
}
|
||||
|
||||
/// For testing, our historic layers are just in-memory layer objects with `frozen==true`.
|
||||
struct TestHistoricLayer(TestInMemoryLayer);
|
||||
|
||||
/// This is the data structure that impls the `HistoricStuff` trait.
|
||||
#[derive(Default, Clone)]
|
||||
struct TestHistoricStuff {
|
||||
by_key: BTreeMap<usize, BTreeMap<usize, Arc<TestHistoricLayer>>>,
|
||||
}
|
||||
|
||||
/// `seqwait::MonotonicCounter` impl
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct UsizeCounter(usize);
|
||||
|
||||
// Our testing impl of HistoricStuff references the frozen InMemoryLayer objects
|
||||
// from all the (key,lsn) entries that it covers.
|
||||
// This mimics the (much more efficient) search tree in the real impl.
|
||||
impl super::HistoricStuff for TestHistoricStuff {
|
||||
type Types = TestTypes;
|
||||
fn get_reconstruct_path(
|
||||
&self,
|
||||
key: usize,
|
||||
lsn: usize,
|
||||
) -> Result<Vec<Arc<TestHistoricLayer>>, super::GetReconstructPathError> {
|
||||
let Some(bk) = self.by_key.get(&key) else {
|
||||
return Ok(vec![]);
|
||||
};
|
||||
Ok(bk.range(..=lsn).rev().map(|(_, l)| Arc::clone(l)).collect())
|
||||
}
|
||||
|
||||
fn make_historic(&self, inmem: TestInMemoryLayer) -> Self {
|
||||
// For the purposes of testing, just turn the inmemory layer historic through the type system
|
||||
let historic = Arc::new(TestHistoricLayer(inmem));
|
||||
// Deep-copy
|
||||
let mut copy = self.by_key.clone();
|
||||
// Add the references to `inmem` to the deep-copied struct
|
||||
for (k, v) in historic.0.by_key.iter() {
|
||||
for (lsn, _deltas) in v.into_iter() {
|
||||
let by_key = copy.entry(*k).or_default();
|
||||
let overwritten = by_key.insert(*lsn, historic.clone());
|
||||
assert!(matches!(overwritten, None), "layers must not overlap");
|
||||
}
|
||||
}
|
||||
Self { by_key: copy }
|
||||
}
|
||||
}
|
||||
|
||||
impl super::InMemoryLayer for TestInMemoryLayer {
|
||||
type Types = TestTypes;
|
||||
|
||||
fn put(
|
||||
&mut self,
|
||||
key: usize,
|
||||
lsn: usize,
|
||||
delta: &'static str,
|
||||
) -> Result<Self, super::InMemoryLayerPutError<&'static str>> {
|
||||
let mut clone = self.clone();
|
||||
drop(self);
|
||||
let by_key = clone.by_key.entry(key).or_default();
|
||||
match by_key.entry(lsn) {
|
||||
Entry::Occupied(_record) => {
|
||||
return Err(super::InMemoryLayerPutError {
|
||||
delta,
|
||||
kind: super::InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
|
||||
});
|
||||
}
|
||||
Entry::Vacant(vacant) => vacant.insert(delta),
|
||||
};
|
||||
Ok(clone)
|
||||
}
|
||||
|
||||
fn get(&self, key: usize, lsn: usize) -> Vec<&'static str> {
|
||||
let by_key = match self.by_key.get(&key) {
|
||||
Some(by_key) => by_key,
|
||||
None => return vec![],
|
||||
};
|
||||
by_key
|
||||
.range(..=lsn)
|
||||
.map(|(_, v)| v)
|
||||
.rev()
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl UsizeCounter {
|
||||
pub fn new(inital: usize) -> Self {
|
||||
UsizeCounter(inital)
|
||||
}
|
||||
}
|
||||
|
||||
impl seqwait::MonotonicCounter<usize> for UsizeCounter {
|
||||
fn cnt_advance(&mut self, new_val: usize) {
|
||||
assert!(self.0 < new_val);
|
||||
self.0 = new_val;
|
||||
}
|
||||
|
||||
fn cnt_value(&self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic() {
|
||||
let lm = TestHistoricStuff::default();
|
||||
|
||||
let (r, mut rw) = super::new::<TestTypes>(UsizeCounter::new(0), lm);
|
||||
|
||||
let r = Arc::new(r);
|
||||
let r2 = Arc::clone(&r);
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let read_jh = rt.spawn(async move { r.get(0, 10).await });
|
||||
|
||||
let mut rw = rt.block_on(async move {
|
||||
rw.put(0, 1, "foo").await.unwrap();
|
||||
rw.put(1, 1, "bar").await.unwrap();
|
||||
rw.put(0, 10, "baz").await.unwrap();
|
||||
rw
|
||||
});
|
||||
|
||||
let read_res = rt.block_on(read_jh).unwrap().unwrap();
|
||||
assert!(
|
||||
read_res.historic_path.is_empty(),
|
||||
"we have pushed less than needed for flush"
|
||||
);
|
||||
assert_eq!(read_res.inmem_records, vec!["baz", "foo"]);
|
||||
|
||||
let rw = rt.block_on(async move {
|
||||
rw.put(0, 11, "blup").await.unwrap();
|
||||
rw
|
||||
});
|
||||
let read_res = rt.block_on(async move { r2.get(0, 11).await.unwrap() });
|
||||
assert_eq!(read_res.historic_path.len(), 0);
|
||||
assert_eq!(read_res.inmem_records, vec!["blup", "baz", "foo"]);
|
||||
|
||||
drop(rw);
|
||||
}
|
||||
@@ -14,4 +14,5 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -33,11 +33,10 @@ serde_with.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
url.workspace = true
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
uuid.workspace = true
|
||||
|
||||
metrics.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
either.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
|
||||
@@ -76,6 +76,7 @@ where
|
||||
|
||||
let log_quietly = method == Method::GET;
|
||||
async move {
|
||||
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
|
||||
if log_quietly {
|
||||
debug!("Handling request");
|
||||
} else {
|
||||
@@ -87,7 +88,11 @@ where
|
||||
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
|
||||
//
|
||||
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
|
||||
match (self.0)(request).await {
|
||||
let res = (self.0)(request).await;
|
||||
|
||||
cancellation_guard.disarm();
|
||||
|
||||
match res {
|
||||
Ok(response) => {
|
||||
let response_status = response.status();
|
||||
if log_quietly && response_status.is_success() {
|
||||
@@ -105,6 +110,38 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop guard to WARN in case the request was dropped before completion.
|
||||
struct RequestCancelled {
|
||||
warn: Option<tracing::Span>,
|
||||
}
|
||||
|
||||
impl RequestCancelled {
|
||||
/// Create the drop guard using the [`tracing::Span::current`] as the span.
|
||||
fn warn_when_dropped_without_responding() -> Self {
|
||||
RequestCancelled {
|
||||
warn: Some(tracing::Span::current()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume the drop guard without logging anything.
|
||||
fn disarm(mut self) {
|
||||
self.warn = None;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RequestCancelled {
|
||||
fn drop(&mut self) {
|
||||
if let Some(span) = self.warn.take() {
|
||||
// the span has all of the info already, but the outer `.instrument(span)` has already
|
||||
// been dropped, so we need to manually re-enter it for this message.
|
||||
//
|
||||
// this is what the instrument would do before polling so it is fine.
|
||||
let _g = span.entered();
|
||||
warn!("request was dropped before completing");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
|
||||
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
@@ -23,25 +24,64 @@ impl LogFormat {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
let default_filter_str = "info";
|
||||
static TRACING_EVENT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
metrics::register_int_counter_vec!(
|
||||
"libmetrics_tracing_event_count",
|
||||
"Number of tracing events, by level",
|
||||
&["level"]
|
||||
)
|
||||
.expect("failed to define metric")
|
||||
});
|
||||
|
||||
struct TracingEventCountLayer(&'static metrics::IntCounterVec);
|
||||
|
||||
impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
|
||||
where
|
||||
S: tracing::Subscriber,
|
||||
{
|
||||
fn on_event(
|
||||
&self,
|
||||
event: &tracing::Event<'_>,
|
||||
_ctx: tracing_subscriber::layer::Context<'_, S>,
|
||||
) {
|
||||
let level = event.metadata().level();
|
||||
let level = match *level {
|
||||
tracing::Level::ERROR => "error",
|
||||
tracing::Level::WARN => "warn",
|
||||
tracing::Level::INFO => "info",
|
||||
tracing::Level::DEBUG => "debug",
|
||||
tracing::Level::TRACE => "trace",
|
||||
};
|
||||
self.0.with_label_values(&[level]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
|
||||
let rust_log_env_filter = || {
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
|
||||
};
|
||||
|
||||
let base_logger = tracing_subscriber::fmt()
|
||||
.with_env_filter(env_filter)
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
|
||||
match log_format {
|
||||
LogFormat::Json => base_logger.json().init(),
|
||||
LogFormat::Plain => base_logger.init(),
|
||||
LogFormat::Test => base_logger.with_test_writer().init(),
|
||||
}
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
tracing_subscriber::registry()
|
||||
.with({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
let log_layer = match log_format {
|
||||
LogFormat::Json => log_layer.json().boxed(),
|
||||
LogFormat::Plain => log_layer.boxed(),
|
||||
LogFormat::Test => log_layer.with_test_writer().boxed(),
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
})
|
||||
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
|
||||
.init();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -157,3 +197,33 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
|
||||
<Self as std::fmt::Display>::fmt(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use metrics::{core::Opts, IntCounterVec};
|
||||
|
||||
use super::TracingEventCountLayer;
|
||||
|
||||
#[test]
|
||||
fn tracing_event_count_metric() {
|
||||
let counter_vec =
|
||||
IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
|
||||
let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static
|
||||
let layer = TracingEventCountLayer(counter_vec);
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
|
||||
tracing::trace!("foo");
|
||||
tracing::debug!("foo");
|
||||
tracing::info!("foo");
|
||||
tracing::warn!("foo");
|
||||
tracing::error!("foo");
|
||||
});
|
||||
|
||||
assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
|
||||
assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use either::Either;
|
||||
use std::cmp::{Eq, Ordering, PartialOrd};
|
||||
use std::collections::BinaryHeap;
|
||||
use std::fmt::Debug;
|
||||
use std::mem;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::oneshot::{channel, Receiver, Sender};
|
||||
use tokio::sync::watch::{channel, Receiver, Sender};
|
||||
use tokio::time::timeout;
|
||||
|
||||
/// An error happened while waiting for a number
|
||||
@@ -37,48 +36,45 @@ pub trait MonotonicCounter<V> {
|
||||
}
|
||||
|
||||
/// Internal components of a `SeqWait`
|
||||
struct SeqWaitInt<S, V, T>
|
||||
struct SeqWaitInt<S, V>
|
||||
where
|
||||
S: MonotonicCounter<V>,
|
||||
V: Ord,
|
||||
T: Clone,
|
||||
{
|
||||
waiters: BinaryHeap<Waiter<V, T>>,
|
||||
waiters: BinaryHeap<Waiter<V>>,
|
||||
current: S,
|
||||
shutdown: bool,
|
||||
data: T,
|
||||
}
|
||||
|
||||
struct Waiter<V, T>
|
||||
struct Waiter<T>
|
||||
where
|
||||
V: Ord,
|
||||
T: Clone,
|
||||
T: Ord,
|
||||
{
|
||||
wake_num: V, // wake me when this number arrives ...
|
||||
wake_channel: Sender<T>, // ... by sending a message to this channel
|
||||
wake_num: T, // wake me when this number arrives ...
|
||||
wake_channel: Sender<()>, // ... by sending a message to this channel
|
||||
}
|
||||
|
||||
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
|
||||
// to get that.
|
||||
impl<V: Ord, T: Clone> PartialOrd for Waiter<V, T> {
|
||||
impl<T: Ord> PartialOrd for Waiter<T> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
other.wake_num.partial_cmp(&self.wake_num)
|
||||
}
|
||||
}
|
||||
|
||||
impl<V: Ord, T: Clone> Ord for Waiter<V, T> {
|
||||
impl<T: Ord> Ord for Waiter<T> {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
other.wake_num.cmp(&self.wake_num)
|
||||
}
|
||||
}
|
||||
|
||||
impl<V: Ord, T: Clone> PartialEq for Waiter<V, T> {
|
||||
impl<T: Ord> PartialEq for Waiter<T> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
other.wake_num == self.wake_num
|
||||
}
|
||||
}
|
||||
|
||||
impl<V: Ord, T: Clone> Eq for Waiter<V, T> {}
|
||||
impl<T: Ord> Eq for Waiter<T> {}
|
||||
|
||||
/// A tool for waiting on a sequence number
|
||||
///
|
||||
@@ -96,28 +92,25 @@ impl<V: Ord, T: Clone> Eq for Waiter<V, T> {}
|
||||
///
|
||||
/// <S> means Storage, <V> is type of counter that this storage exposes.
|
||||
///
|
||||
pub struct SeqWait<S, V, T>
|
||||
pub struct SeqWait<S, V>
|
||||
where
|
||||
S: MonotonicCounter<V>,
|
||||
V: Ord,
|
||||
T: Clone,
|
||||
{
|
||||
internal: Mutex<SeqWaitInt<S, V, T>>,
|
||||
internal: Mutex<SeqWaitInt<S, V>>,
|
||||
}
|
||||
|
||||
impl<S, V, T> SeqWait<S, V, T>
|
||||
impl<S, V> SeqWait<S, V>
|
||||
where
|
||||
S: MonotonicCounter<V> + Copy,
|
||||
V: Ord + Copy,
|
||||
T: Clone,
|
||||
{
|
||||
/// Create a new `SeqWait`, initialized to a particular number
|
||||
pub fn new(starting_num: S, data: T) -> Self {
|
||||
pub fn new(starting_num: S) -> Self {
|
||||
let internal = SeqWaitInt {
|
||||
waiters: BinaryHeap::new(),
|
||||
current: starting_num,
|
||||
shutdown: false,
|
||||
data,
|
||||
};
|
||||
SeqWait {
|
||||
internal: Mutex::new(internal),
|
||||
@@ -151,13 +144,10 @@ where
|
||||
///
|
||||
/// This call won't complete until someone has called `advance`
|
||||
/// with a number greater than or equal to the one we're waiting for.
|
||||
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
|
||||
match self.queue_for_wait(num, false) {
|
||||
Ok(Either::Left(data)) => Ok(data),
|
||||
Ok(Either::Right(rx)) => match rx.await {
|
||||
Err(_) => Err(SeqWaitError::Shutdown),
|
||||
Ok(data) => Ok(data),
|
||||
},
|
||||
pub async fn wait_for(&self, num: V) -> Result<(), SeqWaitError> {
|
||||
match self.queue_for_wait(num) {
|
||||
Ok(None) => Ok(()),
|
||||
Ok(Some(mut rx)) => rx.changed().await.map_err(|_| SeqWaitError::Shutdown),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
@@ -169,18 +159,15 @@ where
|
||||
///
|
||||
/// If that hasn't happened after the specified timeout duration,
|
||||
/// [`SeqWaitError::Timeout`] will be returned.
|
||||
///
|
||||
/// Pass `timeout_duration.is_zero() == true` to guarantee that the
|
||||
/// future that is this function will never await.
|
||||
pub async fn wait_for_timeout(
|
||||
&self,
|
||||
num: V,
|
||||
timeout_duration: Duration,
|
||||
) -> Result<T, SeqWaitError> {
|
||||
match self.queue_for_wait(num, timeout_duration.is_zero()) {
|
||||
Ok(Either::Left(data)) => Ok(data),
|
||||
Ok(Either::Right(rx)) => match timeout(timeout_duration, rx).await {
|
||||
Ok(Ok(data)) => Ok(data),
|
||||
) -> Result<(), SeqWaitError> {
|
||||
match self.queue_for_wait(num) {
|
||||
Ok(None) => Ok(()),
|
||||
Ok(Some(mut rx)) => match timeout(timeout_duration, rx.changed()).await {
|
||||
Ok(Ok(())) => Ok(()),
|
||||
Ok(Err(_)) => Err(SeqWaitError::Shutdown),
|
||||
Err(_) => Err(SeqWaitError::Timeout),
|
||||
},
|
||||
@@ -190,50 +177,41 @@ where
|
||||
|
||||
/// Register and return a channel that will be notified when a number arrives,
|
||||
/// or None, if it has already arrived.
|
||||
fn queue_for_wait(&self, num: V, nowait: bool) -> Result<Either<T, Receiver<T>>, SeqWaitError> {
|
||||
fn queue_for_wait(&self, num: V) -> Result<Option<Receiver<()>>, SeqWaitError> {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
if internal.current.cnt_value() >= num {
|
||||
return Ok(Either::Left(internal.data.clone()));
|
||||
return Ok(None);
|
||||
}
|
||||
if internal.shutdown {
|
||||
return Err(SeqWaitError::Shutdown);
|
||||
}
|
||||
if nowait {
|
||||
return Err(SeqWaitError::Timeout);
|
||||
}
|
||||
|
||||
// Create a new channel.
|
||||
let (tx, rx) = channel();
|
||||
let (tx, rx) = channel(());
|
||||
internal.waiters.push(Waiter {
|
||||
wake_num: num,
|
||||
wake_channel: tx,
|
||||
});
|
||||
// Drop the lock as we exit this scope.
|
||||
Ok(Either::Right(rx))
|
||||
Ok(Some(rx))
|
||||
}
|
||||
|
||||
/// Announce a new number has arrived
|
||||
///
|
||||
/// All waiters at this value or below will be woken.
|
||||
///
|
||||
/// If `new_data` is Some(), it will update the internal data,
|
||||
/// even if `num` is smaller than the internal counter.
|
||||
/// It will not cause a wake-up though, in this case.
|
||||
///
|
||||
/// Returns the old number.
|
||||
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
|
||||
pub fn advance(&self, num: V) -> V {
|
||||
let old_value;
|
||||
let (wake_these, with_data) = {
|
||||
let wake_these = {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
if let Some(new_data) = new_data {
|
||||
internal.data = new_data;
|
||||
}
|
||||
|
||||
old_value = internal.current.cnt_value();
|
||||
if old_value >= num {
|
||||
return old_value;
|
||||
}
|
||||
internal.current.cnt_advance(num);
|
||||
|
||||
// Pop all waiters <= num from the heap. Collect them in a vector, and
|
||||
// wake them up after releasing the lock.
|
||||
let mut wake_these = Vec::new();
|
||||
@@ -243,13 +221,13 @@ where
|
||||
}
|
||||
wake_these.push(internal.waiters.pop().unwrap().wake_channel);
|
||||
}
|
||||
(wake_these, internal.data.clone())
|
||||
wake_these
|
||||
};
|
||||
|
||||
for tx in wake_these {
|
||||
// This can fail if there are no receivers.
|
||||
// We don't care; discard the error.
|
||||
let _ = tx.send(with_data.clone());
|
||||
let _ = tx.send(());
|
||||
}
|
||||
old_value
|
||||
}
|
||||
@@ -258,106 +236,6 @@ where
|
||||
pub fn load(&self) -> S {
|
||||
self.internal.lock().unwrap().current
|
||||
}
|
||||
|
||||
/// Split the seqwait into a part than can only do wait,
|
||||
/// and another part that can do advance + wait.
|
||||
///
|
||||
/// The wait-only part can be cloned, the advance part cannot be cloned.
|
||||
/// This provides a single-producer multi-consumer scheme.
|
||||
pub fn split_spmc(self) -> (Wait<S, V, T>, Advance<S, V, T>) {
|
||||
let inner = Arc::new(self);
|
||||
let w = Wait {
|
||||
inner: inner.clone(),
|
||||
};
|
||||
let a = Advance { inner };
|
||||
(w, a)
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`SeqWait::split_spmc`].
|
||||
pub struct Wait<S, V, T>
|
||||
where
|
||||
S: MonotonicCounter<V> + Copy,
|
||||
V: Ord + Copy,
|
||||
T: Clone,
|
||||
{
|
||||
inner: Arc<SeqWait<S, V, T>>,
|
||||
}
|
||||
|
||||
/// See [`SeqWait::split_spmc`].
|
||||
pub struct Advance<S, V, T>
|
||||
where
|
||||
S: MonotonicCounter<V> + Copy,
|
||||
V: Ord + Copy,
|
||||
T: Clone,
|
||||
{
|
||||
inner: Arc<SeqWait<S, V, T>>,
|
||||
}
|
||||
|
||||
impl<S, V, T> Wait<S, V, T>
|
||||
where
|
||||
S: MonotonicCounter<V> + Copy,
|
||||
V: Ord + Copy,
|
||||
T: Clone,
|
||||
{
|
||||
/// See [`SeqWait::wait_for`].
|
||||
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
|
||||
self.inner.wait_for(num).await
|
||||
}
|
||||
|
||||
/// See [`SeqWait::wait_for_timeout`].
|
||||
pub async fn wait_for_timeout(
|
||||
&self,
|
||||
num: V,
|
||||
timeout_duration: Duration,
|
||||
) -> Result<T, SeqWaitError> {
|
||||
self.inner.wait_for_timeout(num, timeout_duration).await
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, V, T> Advance<S, V, T>
|
||||
where
|
||||
S: MonotonicCounter<V> + Copy,
|
||||
V: Ord + Copy,
|
||||
T: Clone,
|
||||
{
|
||||
/// See [`SeqWait::advance`].
|
||||
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
|
||||
self.inner.advance(num, new_data)
|
||||
}
|
||||
|
||||
/// See [`SeqWait::wait_for`].
|
||||
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
|
||||
self.inner.wait_for(num).await
|
||||
}
|
||||
|
||||
/// See [`SeqWait::wait_for_timeout`].
|
||||
pub async fn wait_for_timeout(
|
||||
&self,
|
||||
num: V,
|
||||
timeout_duration: Duration,
|
||||
) -> Result<T, SeqWaitError> {
|
||||
self.inner.wait_for_timeout(num, timeout_duration).await
|
||||
}
|
||||
|
||||
/// Get a `Clone::clone` of the current data inside the seqwait.
|
||||
pub fn get_current_data(&self) -> (V, T) {
|
||||
let inner = self.inner.internal.lock().unwrap();
|
||||
(inner.current.cnt_value(), inner.data.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, V, T> Clone for Wait<S, V, T>
|
||||
where
|
||||
S: MonotonicCounter<V> + Copy,
|
||||
V: Ord + Copy,
|
||||
T: Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -378,12 +256,12 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn seqwait() {
|
||||
let seq = Arc::new(SeqWait::new(0, ()));
|
||||
let seq = Arc::new(SeqWait::new(0));
|
||||
let seq2 = Arc::clone(&seq);
|
||||
let seq3 = Arc::clone(&seq);
|
||||
let jh1 = tokio::task::spawn(async move {
|
||||
seq2.wait_for(42).await.expect("wait_for 42");
|
||||
let old = seq2.advance(100, None);
|
||||
let old = seq2.advance(100);
|
||||
assert_eq!(old, 99);
|
||||
seq2.wait_for_timeout(999, Duration::from_millis(100))
|
||||
.await
|
||||
@@ -394,12 +272,12 @@ mod tests {
|
||||
seq3.wait_for(0).await.expect("wait_for 0");
|
||||
});
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let old = seq.advance(99, None);
|
||||
let old = seq.advance(99);
|
||||
assert_eq!(old, 0);
|
||||
seq.wait_for(100).await.expect("wait_for 100");
|
||||
|
||||
// Calling advance with a smaller value is a no-op
|
||||
assert_eq!(seq.advance(98, None), 100);
|
||||
assert_eq!(seq.advance(98), 100);
|
||||
assert_eq!(seq.load(), 100);
|
||||
|
||||
jh1.await.unwrap();
|
||||
@@ -410,7 +288,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn seqwait_timeout() {
|
||||
let seq = Arc::new(SeqWait::new(0, ()));
|
||||
let seq = Arc::new(SeqWait::new(0));
|
||||
let seq2 = Arc::clone(&seq);
|
||||
let jh = tokio::task::spawn(async move {
|
||||
let timeout = Duration::from_millis(1);
|
||||
@@ -420,104 +298,10 @@ mod tests {
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
// This will attempt to wake, but nothing will happen
|
||||
// because the waiter already dropped its Receiver.
|
||||
let old = seq.advance(99, None);
|
||||
let old = seq.advance(99);
|
||||
assert_eq!(old, 0);
|
||||
jh.await.unwrap();
|
||||
|
||||
seq.shutdown();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn data_basic() {
|
||||
let seq = Arc::new(SeqWait::new(0, "a"));
|
||||
let seq2 = Arc::clone(&seq);
|
||||
let jh = tokio::task::spawn(async move {
|
||||
let data = seq.wait_for(2).await.unwrap();
|
||||
assert_eq!(data, "b");
|
||||
});
|
||||
seq2.advance(1, Some("x"));
|
||||
seq2.advance(2, Some("b"));
|
||||
jh.await.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn data_always_most_recent() {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let seq = Arc::new(SeqWait::new(0, "a"));
|
||||
let seq2 = Arc::clone(&seq);
|
||||
|
||||
let jh = rt.spawn(async move {
|
||||
let data = seq.wait_for(2).await.unwrap();
|
||||
assert_eq!(data, "d");
|
||||
});
|
||||
|
||||
// jh is not running until we poll it, thanks to current thread runtime
|
||||
|
||||
rt.block_on(async move {
|
||||
seq2.advance(2, Some("b"));
|
||||
seq2.advance(3, Some("c"));
|
||||
seq2.advance(4, Some("d"));
|
||||
});
|
||||
|
||||
rt.block_on(jh).unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn split_spmc_api_surface() {
|
||||
let seq = SeqWait::new(0, 1);
|
||||
let (w, a) = seq.split_spmc();
|
||||
|
||||
let _ = w.wait_for(1);
|
||||
let _ = w.wait_for_timeout(0, Duration::from_secs(10));
|
||||
let _ = w.clone();
|
||||
|
||||
let _ = a.advance(1, None);
|
||||
let _ = a.wait_for(1);
|
||||
let _ = a.wait_for_timeout(0, Duration::from_secs(10));
|
||||
|
||||
// TODO would be nice to have must-not-compile tests for Advance not being clonable.
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn new_data_same_lsn() {
|
||||
let seq = Arc::new(SeqWait::new(0, "a"));
|
||||
|
||||
seq.advance(1, Some("b"));
|
||||
let data = seq.wait_for(1).await.unwrap();
|
||||
assert_eq!(data, "b", "the regular case where lsn and data advance");
|
||||
|
||||
seq.advance(1, Some("c"));
|
||||
let data = seq.wait_for(1).await.unwrap();
|
||||
assert_eq!(
|
||||
data, "c",
|
||||
"no lsn advance still gives new data for old lsn wait_for's"
|
||||
);
|
||||
|
||||
let (start_wait_for_sender, start_wait_for_receiver) = tokio::sync::oneshot::channel();
|
||||
// ensure we don't wake waiters for data-only change
|
||||
let jh = tokio::spawn({
|
||||
let seq = seq.clone();
|
||||
async move {
|
||||
start_wait_for_receiver.await.unwrap();
|
||||
match tokio::time::timeout(Duration::from_secs(2), seq.wait_for(2)).await {
|
||||
Ok(_) => {
|
||||
assert!(
|
||||
false,
|
||||
"advance should not wake waiters if data changes but LSN doesn't"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
// Good, we weren't woken up.
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
seq.advance(1, Some("d"));
|
||||
start_wait_for_sender.send(()).unwrap();
|
||||
jh.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::time::Instant;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
let mut layer_map = LayerMap::<LayerDescriptor>::default();
|
||||
@@ -114,7 +114,7 @@ fn bench_from_captest_env(c: &mut Criterion) {
|
||||
c.bench_function("captest_uniform_queries", |b| {
|
||||
b.iter(|| {
|
||||
for q in queries.clone().into_iter() {
|
||||
layer_map.search(q.0, q.1);
|
||||
black_box(layer_map.search(q.0, q.1));
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -122,11 +122,11 @@ fn bench_from_captest_env(c: &mut Criterion) {
|
||||
// test with a key that corresponds to the RelDir entry. See pgdatadir_mapping.rs.
|
||||
c.bench_function("captest_rel_dir_query", |b| {
|
||||
b.iter(|| {
|
||||
let result = layer_map.search(
|
||||
let result = black_box(layer_map.search(
|
||||
Key::from_hex("000000067F00008000000000000000000001").unwrap(),
|
||||
// This LSN is higher than any of the LSNs in the tree
|
||||
Lsn::from_str("D0/80208AE1").unwrap(),
|
||||
);
|
||||
));
|
||||
result.unwrap();
|
||||
});
|
||||
});
|
||||
@@ -183,7 +183,7 @@ fn bench_from_real_project(c: &mut Criterion) {
|
||||
group.bench_function("uniform_queries", |b| {
|
||||
b.iter(|| {
|
||||
for q in queries.clone().into_iter() {
|
||||
layer_map.search(q.0, q.1);
|
||||
black_box(layer_map.search(q.0, q.1));
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -232,7 +232,7 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
group.bench_function("uniform_queries", |b| {
|
||||
b.iter(|| {
|
||||
for q in queries.clone().into_iter() {
|
||||
layer_map.search(q.0, q.1);
|
||||
black_box(layer_map.search(q.0, q.1));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde::de::IntoDeserializer;
|
||||
use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -62,7 +63,6 @@ pub mod defaults {
|
||||
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
|
||||
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
|
||||
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
|
||||
///
|
||||
/// Default built-in configuration file.
|
||||
@@ -91,7 +91,6 @@ pub mod defaults {
|
||||
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
|
||||
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
|
||||
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
|
||||
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
|
||||
|
||||
@@ -108,6 +107,7 @@ pub mod defaults {
|
||||
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
|
||||
|
||||
#min_resident_size_override = .. # in bytes
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
|
||||
# [remote_storage]
|
||||
|
||||
@@ -182,9 +182,6 @@ pub struct PageServerConf {
|
||||
pub metric_collection_endpoint: Option<Url>,
|
||||
pub synthetic_size_calculation_interval: Duration,
|
||||
|
||||
// See the corresponding metric's help string.
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
|
||||
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
|
||||
|
||||
pub test_remote_failures: u64,
|
||||
@@ -257,8 +254,6 @@ struct PageServerConfigBuilder {
|
||||
metric_collection_endpoint: BuilderValue<Option<Url>>,
|
||||
synthetic_size_calculation_interval: BuilderValue<Duration>,
|
||||
|
||||
evictions_low_residence_duration_metric_threshold: BuilderValue<Duration>,
|
||||
|
||||
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
|
||||
|
||||
test_remote_failures: BuilderValue<u64>,
|
||||
@@ -316,11 +311,6 @@ impl Default for PageServerConfigBuilder {
|
||||
.expect("cannot parse default synthetic size calculation interval")),
|
||||
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
|
||||
|
||||
evictions_low_residence_duration_metric_threshold: Set(humantime::parse_duration(
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")),
|
||||
|
||||
disk_usage_based_eviction: Set(None),
|
||||
|
||||
test_remote_failures: Set(0),
|
||||
@@ -438,10 +428,6 @@ impl PageServerConfigBuilder {
|
||||
self.test_remote_failures = BuilderValue::Set(fail_first);
|
||||
}
|
||||
|
||||
pub fn evictions_low_residence_duration_metric_threshold(&mut self, value: Duration) {
|
||||
self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
|
||||
self.disk_usage_based_eviction = BuilderValue::Set(value);
|
||||
}
|
||||
@@ -525,11 +511,6 @@ impl PageServerConfigBuilder {
|
||||
synthetic_size_calculation_interval: self
|
||||
.synthetic_size_calculation_interval
|
||||
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
|
||||
evictions_low_residence_duration_metric_threshold: self
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.ok_or(anyhow!(
|
||||
"missing evictions_low_residence_duration_metric_threshold"
|
||||
))?,
|
||||
disk_usage_based_eviction: self
|
||||
.disk_usage_based_eviction
|
||||
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
|
||||
@@ -721,12 +702,12 @@ impl PageServerConf {
|
||||
"synthetic_size_calculation_interval" =>
|
||||
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
|
||||
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
|
||||
"evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?),
|
||||
"disk_usage_based_eviction" => {
|
||||
tracing::info!("disk_usage_based_eviction: {:#?}", &item);
|
||||
builder.disk_usage_based_eviction(
|
||||
toml_edit::de::from_item(item.clone())
|
||||
.context("parse disk_usage_based_eviction")?)
|
||||
deserialize_from_item("disk_usage_based_eviction", item)
|
||||
.context("parse disk_usage_based_eviction")?
|
||||
)
|
||||
},
|
||||
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
@@ -827,18 +808,25 @@ impl PageServerConf {
|
||||
|
||||
if let Some(eviction_policy) = item.get("eviction_policy") {
|
||||
t_conf.eviction_policy = Some(
|
||||
toml_edit::de::from_item(eviction_policy.clone())
|
||||
deserialize_from_item("eviction_policy", eviction_policy)
|
||||
.context("parse eviction_policy")?,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(item) = item.get("min_resident_size_override") {
|
||||
t_conf.min_resident_size_override = Some(
|
||||
toml_edit::de::from_item(item.clone())
|
||||
deserialize_from_item("min_resident_size_override", item)
|
||||
.context("parse min_resident_size_override")?,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(item) = item.get("evictions_low_residence_duration_metric_threshold") {
|
||||
t_conf.evictions_low_residence_duration_metric_threshold = Some(parse_toml_duration(
|
||||
"evictions_low_residence_duration_metric_threshold",
|
||||
item,
|
||||
)?);
|
||||
}
|
||||
|
||||
Ok(t_conf)
|
||||
}
|
||||
|
||||
@@ -877,10 +865,6 @@ impl PageServerConf {
|
||||
cached_metric_collection_interval: Duration::from_secs(60 * 60),
|
||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||
synthetic_size_calculation_interval: Duration::from_secs(60),
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.unwrap(),
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
@@ -938,6 +922,18 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
// ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
|
||||
let deserializer = match item.clone().into_value() {
|
||||
Ok(value) => value.into_deserializer(),
|
||||
Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
|
||||
};
|
||||
T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
|
||||
}
|
||||
|
||||
/// Configurable semaphore permits setting.
|
||||
///
|
||||
/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
|
||||
@@ -1004,9 +1000,10 @@ mod tests {
|
||||
|
||||
use remote_storage::{RemoteStorageKind, S3Config};
|
||||
use tempfile::{tempdir, TempDir};
|
||||
use utils::serde_percent::Percent;
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use crate::{tenant::config::EvictionPolicy, DEFAULT_PG_VERSION};
|
||||
|
||||
const ALL_BASE_VALUES_TOML: &str = r#"
|
||||
# Initial configuration file created by 'pageserver --init'
|
||||
@@ -1029,8 +1026,6 @@ cached_metric_collection_interval = '22200 s'
|
||||
metric_collection_endpoint = 'http://localhost:80/metrics'
|
||||
synthetic_size_calculation_interval = '333 s'
|
||||
|
||||
evictions_low_residence_duration_metric_threshold = '444 s'
|
||||
|
||||
log_format = 'json'
|
||||
|
||||
"#;
|
||||
@@ -1087,9 +1082,6 @@ log_format = 'json'
|
||||
synthetic_size_calculation_interval: humantime::parse_duration(
|
||||
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
|
||||
)?,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD
|
||||
)?,
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
@@ -1144,7 +1136,6 @@ log_format = 'json'
|
||||
cached_metric_collection_interval: Duration::from_secs(22200),
|
||||
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
|
||||
synthetic_size_calculation_interval: Duration::from_secs(333),
|
||||
evictions_low_residence_duration_metric_threshold: Duration::from_secs(444),
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
@@ -1310,6 +1301,71 @@ trace_read_requests = {trace_read_requests}"#,
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
|
||||
let tempdir = tempdir()?;
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
|
||||
let pageserver_conf_toml = format!(
|
||||
r#"pg_distrib_dir = "{}"
|
||||
metric_collection_endpoint = "http://sample.url"
|
||||
metric_collection_interval = "10min"
|
||||
id = 222
|
||||
|
||||
[disk_usage_based_eviction]
|
||||
max_usage_pct = 80
|
||||
min_avail_bytes = 0
|
||||
period = "10s"
|
||||
|
||||
[tenant_config]
|
||||
evictions_low_residence_duration_metric_threshold = "20m"
|
||||
|
||||
[tenant_config.eviction_policy]
|
||||
kind = "LayerAccessThreshold"
|
||||
period = "20m"
|
||||
threshold = "20m"
|
||||
"#,
|
||||
pg_distrib_dir.display(),
|
||||
);
|
||||
let toml: Document = pageserver_conf_toml.parse()?;
|
||||
let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
|
||||
|
||||
assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
|
||||
assert_eq!(
|
||||
conf.metric_collection_endpoint,
|
||||
Some("http://sample.url".parse().unwrap())
|
||||
);
|
||||
assert_eq!(
|
||||
conf.metric_collection_interval,
|
||||
Duration::from_secs(10 * 60)
|
||||
);
|
||||
assert_eq!(
|
||||
conf.default_tenant_conf
|
||||
.evictions_low_residence_duration_metric_threshold,
|
||||
Duration::from_secs(20 * 60)
|
||||
);
|
||||
assert_eq!(conf.id, NodeId(222));
|
||||
assert_eq!(
|
||||
conf.disk_usage_based_eviction,
|
||||
Some(DiskUsageEvictionTaskConfig {
|
||||
max_usage_pct: Percent::new(80).unwrap(),
|
||||
min_avail_bytes: 0,
|
||||
period: Duration::from_secs(10),
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
})
|
||||
);
|
||||
match &conf.default_tenant_conf.eviction_policy {
|
||||
EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
|
||||
EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
|
||||
assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
|
||||
assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_fs(tempdir: &TempDir) -> anyhow::Result<(PathBuf, PathBuf)> {
|
||||
let tempdir_path = tempdir.path();
|
||||
|
||||
|
||||
@@ -520,6 +520,43 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/synthetic_size:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
get:
|
||||
description: |
|
||||
Calculate tenant's synthetic size
|
||||
responses:
|
||||
"200":
|
||||
description: Tenant's synthetic size
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SyntheticSizeResponse"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/size:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -948,6 +985,84 @@ components:
|
||||
latest_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
SyntheticSizeResponse:
|
||||
type: object
|
||||
required:
|
||||
- id
|
||||
- size
|
||||
- segment_sizes
|
||||
- inputs
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
format: hex
|
||||
size:
|
||||
type: integer
|
||||
segment_sizes:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/SegmentSize"
|
||||
inputs:
|
||||
type: object
|
||||
properties:
|
||||
segments:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/SegmentData"
|
||||
timeline_inputs:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/TimelineInput"
|
||||
|
||||
SegmentSize:
|
||||
type: object
|
||||
required:
|
||||
- method
|
||||
- accum_size
|
||||
properties:
|
||||
method:
|
||||
type: string
|
||||
accum_size:
|
||||
type: integer
|
||||
|
||||
SegmentData:
|
||||
type: object
|
||||
required:
|
||||
- segment
|
||||
properties:
|
||||
segment:
|
||||
type: object
|
||||
required:
|
||||
- lsn
|
||||
properties:
|
||||
parent:
|
||||
type: integer
|
||||
lsn:
|
||||
type: integer
|
||||
size:
|
||||
type: integer
|
||||
needed:
|
||||
type: boolean
|
||||
timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
kind:
|
||||
type: string
|
||||
|
||||
TimelineInput:
|
||||
type: object
|
||||
required:
|
||||
- timeline_id
|
||||
properties:
|
||||
ancestor_id:
|
||||
type: string
|
||||
ancestor_lsn:
|
||||
type: string
|
||||
timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
Error:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@@ -781,6 +781,19 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
|
||||
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
|
||||
|
||||
if let Some(evictions_low_residence_duration_metric_threshold) =
|
||||
request_data.evictions_low_residence_duration_metric_threshold
|
||||
{
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
|
||||
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
|
||||
.with_context(bad_duration(
|
||||
"evictions_low_residence_duration_metric_threshold",
|
||||
&evictions_low_residence_duration_metric_threshold,
|
||||
))
|
||||
.map_err(ApiError::BadRequest)?,
|
||||
);
|
||||
}
|
||||
|
||||
let target_tenant_id = request_data
|
||||
.new_tenant_id
|
||||
.map(TenantId::from)
|
||||
@@ -914,6 +927,19 @@ async fn update_tenant_config_handler(
|
||||
|
||||
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
|
||||
|
||||
if let Some(evictions_low_residence_duration_metric_threshold) =
|
||||
request_data.evictions_low_residence_duration_metric_threshold
|
||||
{
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
|
||||
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
|
||||
.with_context(bad_duration(
|
||||
"evictions_low_residence_duration_metric_threshold",
|
||||
&evictions_low_residence_duration_metric_threshold,
|
||||
))
|
||||
.map_err(ApiError::BadRequest)?,
|
||||
);
|
||||
}
|
||||
|
||||
let state = get_state(&request);
|
||||
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
|
||||
@@ -1175,6 +1201,37 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
async fn post_tracing_event_handler(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
enum Level {
|
||||
Error,
|
||||
Warn,
|
||||
Info,
|
||||
Debug,
|
||||
Trace,
|
||||
}
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct Request {
|
||||
level: Level,
|
||||
message: String,
|
||||
}
|
||||
let body: Request = json_request(&mut r)
|
||||
.await
|
||||
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
|
||||
|
||||
match body.level {
|
||||
Level::Error => tracing::error!(?body.message),
|
||||
Level::Warn => tracing::warn!(?body.message),
|
||||
Level::Info => tracing::info!(?body.message),
|
||||
Level::Debug => tracing::debug!(?body.message),
|
||||
Level::Trace => tracing::trace!(?body.message),
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
pub fn make_router(
|
||||
conf: &'static PageServerConf,
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
@@ -1315,5 +1372,9 @@ pub fn make_router(
|
||||
testing_api!("set tenant state to broken", handle_tenant_break),
|
||||
)
|
||||
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
|
||||
.post(
|
||||
"/v1/tracing/event",
|
||||
testing_api!("emit a tracing event", post_tracing_event_handler),
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -257,6 +257,22 @@ impl EvictionsWithLowResidenceDuration {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn change_threshold(
|
||||
&mut self,
|
||||
tenant_id: &str,
|
||||
timeline_id: &str,
|
||||
new_threshold: Duration,
|
||||
) {
|
||||
if new_threshold == self.threshold {
|
||||
return;
|
||||
}
|
||||
let mut with_new =
|
||||
EvictionsWithLowResidenceDurationBuilder::new(self.data_source, new_threshold)
|
||||
.build(tenant_id, timeline_id);
|
||||
std::mem::swap(self, &mut with_new);
|
||||
with_new.remove(tenant_id, timeline_id);
|
||||
}
|
||||
|
||||
// This could be a `Drop` impl, but, we need the `tenant_id` and `timeline_id`.
|
||||
fn remove(&mut self, tenant_id: &str, timeline_id: &str) {
|
||||
let Some(_counter) = self.counter.take() else {
|
||||
@@ -369,6 +385,26 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_started",
|
||||
"Incremented by the number of bytes associated with a remote timeline client operation. \
|
||||
The increment happens when the operation is scheduled.",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_finished",
|
||||
"Incremented by the number of bytes associated with a remote timeline client operation. \
|
||||
The increment happens when the operation finishes (regardless of success/failure/shutdown).",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum RemoteOpKind {
|
||||
Upload,
|
||||
@@ -589,7 +625,7 @@ pub struct TimelineMetrics {
|
||||
pub num_persistent_files_created: IntCounter,
|
||||
pub persistent_bytes_written: IntCounter,
|
||||
pub evictions: IntCounter,
|
||||
pub evictions_with_low_residence_duration: EvictionsWithLowResidenceDuration,
|
||||
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
|
||||
}
|
||||
|
||||
impl TimelineMetrics {
|
||||
@@ -656,7 +692,9 @@ impl TimelineMetrics {
|
||||
num_persistent_files_created,
|
||||
persistent_bytes_written,
|
||||
evictions,
|
||||
evictions_with_low_residence_duration,
|
||||
evictions_with_low_residence_duration: std::sync::RwLock::new(
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -675,6 +713,8 @@ impl Drop for TimelineMetrics {
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
self.evictions_with_low_residence_duration
|
||||
.write()
|
||||
.unwrap()
|
||||
.remove(tenant_id, timeline_id);
|
||||
for op in STORAGE_TIME_OPERATIONS {
|
||||
let _ =
|
||||
@@ -719,6 +759,8 @@ pub struct RemoteTimelineClientMetrics {
|
||||
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
|
||||
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
|
||||
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
@@ -729,6 +771,8 @@ impl RemoteTimelineClientMetrics {
|
||||
remote_operation_time: Mutex::new(HashMap::default()),
|
||||
calls_unfinished_gauge: Mutex::new(HashMap::default()),
|
||||
calls_started_hist: Mutex::new(HashMap::default()),
|
||||
bytes_started_counter: Mutex::new(HashMap::default()),
|
||||
bytes_finished_counter: Mutex::new(HashMap::default()),
|
||||
remote_physical_size_gauge: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
@@ -767,6 +811,7 @@ impl RemoteTimelineClientMetrics {
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn calls_unfinished_gauge(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
@@ -808,32 +853,125 @@ impl RemoteTimelineClientMetrics {
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_started_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_finished_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl RemoteTimelineClientMetrics {
|
||||
pub fn get_bytes_started_counter_value(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Option<u64> {
|
||||
let guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
guard.get(&key).map(|counter| counter.get())
|
||||
}
|
||||
|
||||
pub fn get_bytes_finished_counter_value(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Option<u64> {
|
||||
let guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
guard.get(&key).map(|counter| counter.get())
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteTimelineClientMetrics::call_begin`].
|
||||
#[must_use]
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard(Option<IntGauge>);
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard {
|
||||
/// Decremented on drop.
|
||||
calls_unfinished_metric: Option<IntGauge>,
|
||||
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
|
||||
bytes_finished: Option<(IntCounter, u64)>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientCallMetricGuard {
|
||||
/// Consume this guard object without decrementing the metric.
|
||||
/// The caller vouches to do this manually, so that the prior increment of the gauge will cancel out.
|
||||
/// Consume this guard object without performing the metric updates it would do on `drop()`.
|
||||
/// The caller vouches to do the metric updates manually.
|
||||
pub fn will_decrement_manually(mut self) {
|
||||
self.0 = None; // prevent drop() from decrementing
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
bytes_finished,
|
||||
} = &mut self;
|
||||
calls_unfinished_metric.take();
|
||||
bytes_finished.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClientCallMetricGuard {
|
||||
fn drop(&mut self) {
|
||||
if let RemoteTimelineClientCallMetricGuard(Some(guard)) = self {
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
bytes_finished,
|
||||
} = self;
|
||||
if let Some(guard) = calls_unfinished_metric.take() {
|
||||
guard.dec();
|
||||
}
|
||||
if let Some((bytes_finished_metric, value)) = bytes_finished {
|
||||
bytes_finished_metric.inc_by(*value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The enum variants communicate to the [`RemoteTimelineClientMetrics`] whether to
|
||||
/// track the byte size of this call in applicable metric(s).
|
||||
pub(crate) enum RemoteTimelineClientMetricsCallTrackSize {
|
||||
/// Do not account for this call's byte size in any metrics.
|
||||
/// The `reason` field is there to make the call sites self-documenting
|
||||
/// about why they don't need the metric.
|
||||
DontTrackSize { reason: &'static str },
|
||||
/// Track the byte size of the call in applicable metric(s).
|
||||
Bytes(u64),
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
/// Increment the metrics that track ongoing calls to the remote timeline client instance.
|
||||
/// Update the metrics that change when a call to the remote timeline client instance starts.
|
||||
///
|
||||
/// Drop the returned guard object once the operation is finished to decrement the values.
|
||||
/// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions.
|
||||
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that
|
||||
/// is more suitable.
|
||||
/// Never do both.
|
||||
@@ -841,24 +979,51 @@ impl RemoteTimelineClientMetrics {
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) -> RemoteTimelineClientCallMetricGuard {
|
||||
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
self.calls_started_hist(file_kind, op_kind)
|
||||
.observe(unfinished_metric.get() as f64);
|
||||
unfinished_metric.inc();
|
||||
RemoteTimelineClientCallMetricGuard(Some(unfinished_metric))
|
||||
.observe(calls_unfinished_metric.get() as f64);
|
||||
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
|
||||
|
||||
let bytes_finished = match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
|
||||
// nothing to do
|
||||
None
|
||||
}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
self.bytes_started_counter(file_kind, op_kind).inc_by(size);
|
||||
let finished_counter = self.bytes_finished_counter(file_kind, op_kind);
|
||||
Some((finished_counter, size))
|
||||
}
|
||||
};
|
||||
RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric: Some(calls_unfinished_metric),
|
||||
bytes_finished,
|
||||
}
|
||||
}
|
||||
|
||||
/// Manually decrement the metric instead of using the guard object.
|
||||
/// Manually udpate the metrics that track completions, instead of using the guard object.
|
||||
/// Using the guard object is generally preferable.
|
||||
/// See [`call_begin`] for more context.
|
||||
pub(crate) fn call_end(&self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind) {
|
||||
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
pub(crate) fn call_end(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) {
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
debug_assert!(
|
||||
unfinished_metric.get() > 0,
|
||||
calls_unfinished_metric.get() > 0,
|
||||
"begin and end should cancel out"
|
||||
);
|
||||
unfinished_metric.dec();
|
||||
calls_unfinished_metric.dec();
|
||||
match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
self.bytes_finished_counter(file_kind, op_kind).inc_by(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -871,6 +1036,8 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
remote_operation_time,
|
||||
calls_unfinished_gauge,
|
||||
calls_started_hist,
|
||||
bytes_started_counter,
|
||||
bytes_finished_counter,
|
||||
} = self;
|
||||
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
|
||||
@@ -891,6 +1058,22 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_finished_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
{
|
||||
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
|
||||
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
|
||||
@@ -65,7 +65,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = format!("pageserver is shutting down");
|
||||
let msg = "pageserver is shutting down".to_string();
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
@@ -700,6 +700,8 @@ impl PageServerHandler {
|
||||
full_backup: bool,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// check that the timeline exists
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
@@ -712,6 +714,8 @@ impl PageServerHandler {
|
||||
.context("invalid basebackup lsn")?;
|
||||
}
|
||||
|
||||
let lsn_awaited_after = started.elapsed();
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
pgb.flush().await?;
|
||||
@@ -732,7 +736,17 @@ impl PageServerHandler {
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
pgb.flush().await?;
|
||||
info!("basebackup complete");
|
||||
|
||||
let basebackup_after = started
|
||||
.elapsed()
|
||||
.checked_sub(lsn_awaited_after)
|
||||
.unwrap_or(Duration::ZERO);
|
||||
|
||||
info!(
|
||||
lsn_await_millis = lsn_awaited_after.as_millis(),
|
||||
basebackup_millis = basebackup_after.as_millis(),
|
||||
"basebackup complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1735,6 +1735,13 @@ impl Tenant {
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
*self.tenant_conf.write().unwrap() = new_tenant_conf;
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated();
|
||||
}
|
||||
}
|
||||
|
||||
fn create_timeline_data(
|
||||
@@ -1887,7 +1894,7 @@ impl Tenant {
|
||||
.to_string();
|
||||
|
||||
// Convert the config to a toml file.
|
||||
conf_content += &toml_edit::easy::to_string(&tenant_conf)?;
|
||||
conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
|
||||
|
||||
let mut target_config_file = VirtualFile::open_with_options(
|
||||
target_config_path,
|
||||
@@ -2815,6 +2822,9 @@ pub mod harness {
|
||||
trace_read_requests: Some(tenant_conf.trace_read_requests),
|
||||
eviction_policy: Some(tenant_conf.eviction_policy),
|
||||
min_resident_size_override: tenant_conf.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: Some(
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
@@ -93,6 +94,9 @@ pub struct TenantConf {
|
||||
pub trace_read_requests: bool,
|
||||
pub eviction_policy: EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -164,6 +168,11 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -228,6 +237,9 @@ impl TenantConfOpt {
|
||||
min_resident_size_override: self
|
||||
.min_resident_size_override
|
||||
.or(global_conf.min_resident_size_override),
|
||||
evictions_low_residence_duration_metric_threshold: self
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -260,6 +272,10 @@ impl Default for TenantConf {
|
||||
trace_read_requests: false,
|
||||
eviction_policy: EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -275,9 +291,9 @@ mod tests {
|
||||
..TenantConfOpt::default()
|
||||
};
|
||||
|
||||
let toml_form = toml_edit::easy::to_string(&small_conf).unwrap();
|
||||
let toml_form = toml_edit::ser::to_string(&small_conf).unwrap();
|
||||
assert_eq!(toml_form, "gc_horizon = 42\n");
|
||||
assert_eq!(small_conf, toml_edit::easy::from_str(&toml_form).unwrap());
|
||||
assert_eq!(small_conf, toml_edit::de::from_str(&toml_form).unwrap());
|
||||
|
||||
let json_form = serde_json::to_string(&small_conf).unwrap();
|
||||
assert_eq!(json_form, "{\"gc_horizon\":42}");
|
||||
|
||||
@@ -219,7 +219,8 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::metrics::{
|
||||
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
|
||||
REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
|
||||
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::{
|
||||
@@ -367,9 +368,13 @@ impl RemoteTimelineClient {
|
||||
|
||||
/// Download index file
|
||||
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download);
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Index,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
|
||||
download::download_index_part(
|
||||
self.conf,
|
||||
@@ -398,9 +403,13 @@ impl RemoteTimelineClient {
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
) -> anyhow::Result<u64> {
|
||||
let downloaded_size = {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Layer, &RemoteOpKind::Download);
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Layer,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
@@ -886,11 +895,32 @@ impl RemoteTimelineClient {
|
||||
fn calls_unfinished_metric_impl(
|
||||
&self,
|
||||
op: &UploadOp,
|
||||
) -> Option<(RemoteOpFileKind, RemoteOpKind)> {
|
||||
) -> Option<(
|
||||
RemoteOpFileKind,
|
||||
RemoteOpKind,
|
||||
RemoteTimelineClientMetricsCallTrackSize,
|
||||
)> {
|
||||
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
|
||||
let res = match op {
|
||||
UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload),
|
||||
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
|
||||
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
|
||||
UploadOp::UploadLayer(_, m) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
|
||||
),
|
||||
UploadOp::UploadMetadata(_, _) => (
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
DontTrackSize {
|
||||
reason: "metadata uploads are tiny",
|
||||
},
|
||||
),
|
||||
UploadOp::Delete(file_kind, _) => (
|
||||
*file_kind,
|
||||
RemoteOpKind::Delete,
|
||||
DontTrackSize {
|
||||
reason: "should we track deletes? positive or negative sign?",
|
||||
},
|
||||
),
|
||||
UploadOp::Barrier(_) => {
|
||||
// we do not account these
|
||||
return None;
|
||||
@@ -900,20 +930,20 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind);
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
|
||||
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
self.metrics.call_end(&file_kind, &op_kind);
|
||||
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
@@ -981,11 +1011,19 @@ impl RemoteTimelineClient {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
tenant::harness::{TenantHarness, TIMELINE_ID},
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
Tenant,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
|
||||
use std::{collections::HashSet, path::Path};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tokio::runtime::EnterGuard;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
|
||||
@@ -1034,39 +1072,80 @@ mod tests {
|
||||
assert_eq!(found, expected);
|
||||
}
|
||||
|
||||
struct TestSetup {
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
entered_runtime: EnterGuard<'static>,
|
||||
harness: TenantHarness<'static>,
|
||||
tenant: Arc<Tenant>,
|
||||
tenant_ctx: RequestContext,
|
||||
remote_fs_dir: PathBuf,
|
||||
client: Arc<RemoteTimelineClient>,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
fn new(test_name: &str) -> anyhow::Result<Self> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = timeline.initialize(&ctx).unwrap();
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
|
||||
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
runtime,
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl: storage,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
runtime,
|
||||
entered_runtime,
|
||||
harness,
|
||||
tenant,
|
||||
tenant_ctx: ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test scheduling
|
||||
#[test]
|
||||
fn upload_scheduling() -> anyhow::Result<()> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let _entered = runtime.enter();
|
||||
|
||||
let harness = TenantHarness::create("upload_scheduling")?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
let _timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
// Test outline:
|
||||
//
|
||||
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
|
||||
@@ -1081,21 +1160,19 @@ mod tests {
|
||||
// Schedule another deletion. Check that it's launched immediately.
|
||||
// Schedule index upload. Check that it's queued
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let storage_impl = GenericRemoteStorage::from_config(&storage_config)?;
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
let TestSetup {
|
||||
runtime,
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
});
|
||||
entered_runtime: _entered_runtime,
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
tenant_ctx: _tenant_ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
} = TestSetup::new("upload_scheduling").unwrap();
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let remote_timeline_dir =
|
||||
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
|
||||
@@ -1216,4 +1293,90 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> {
|
||||
// Setup
|
||||
|
||||
let TestSetup {
|
||||
runtime,
|
||||
harness,
|
||||
client,
|
||||
..
|
||||
} = TestSetup::new("metrics")?;
|
||||
|
||||
let metadata = dummy_metadata(Lsn(0x10));
|
||||
client.init_upload_queue_for_empty_remote(&metadata)?;
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let content_1 = dummy_contents("foo");
|
||||
std::fs::write(
|
||||
timeline_path.join(layer_file_name_1.file_name()),
|
||||
&content_1,
|
||||
)?;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct BytesStartedFinished {
|
||||
started: Option<usize>,
|
||||
finished: Option<usize>,
|
||||
}
|
||||
let get_bytes_started_stopped = || {
|
||||
let started = client
|
||||
.metrics
|
||||
.get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
let stopped = client
|
||||
.metrics
|
||||
.get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
BytesStartedFinished {
|
||||
started,
|
||||
finished: stopped,
|
||||
}
|
||||
};
|
||||
|
||||
// Test
|
||||
|
||||
let init = get_bytes_started_stopped();
|
||||
|
||||
client.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)?;
|
||||
|
||||
let pre = get_bytes_started_stopped();
|
||||
|
||||
runtime.block_on(client.wait_completion())?;
|
||||
|
||||
let post = get_bytes_started_stopped();
|
||||
|
||||
// Validate
|
||||
|
||||
assert_eq!(
|
||||
init,
|
||||
BytesStartedFinished {
|
||||
started: None,
|
||||
finished: None
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
pre,
|
||||
BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
// assert that the _finished metric is created eagerly so that subtractions work on first sample
|
||||
finished: Some(0),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
post,
|
||||
BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
finished: Some(content_1.len())
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
})?;
|
||||
|
||||
storage
|
||||
.upload(Box::new(source_file), fs_size, &storage_path, None)
|
||||
.upload(source_file, fs_size, &storage_path, None)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -77,6 +77,7 @@ pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -145,7 +146,7 @@ pub struct Timeline {
|
||||
// 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
|
||||
// first WAL record when the node is started up. But here, we just
|
||||
// keep track of it.
|
||||
last_record_lsn: SeqWait<RecordLsn, Lsn, ()>,
|
||||
last_record_lsn: SeqWait<RecordLsn, Lsn>,
|
||||
|
||||
// All WAL records have been processed and stored durably on files on
|
||||
// local disk, up to this LSN. On crash and restart, we need to re-process
|
||||
@@ -161,7 +162,7 @@ pub struct Timeline {
|
||||
ancestor_timeline: Option<Arc<Timeline>>,
|
||||
ancestor_lsn: Lsn,
|
||||
|
||||
metrics: TimelineMetrics,
|
||||
pub(super) metrics: TimelineMetrics,
|
||||
|
||||
/// Ensures layers aren't frozen by checkpointer between
|
||||
/// [`Timeline::get_layer_for_write`] and layer reads.
|
||||
@@ -1136,6 +1137,8 @@ impl Timeline {
|
||||
if let Some(delta) = local_layer_residence_duration {
|
||||
self.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.read()
|
||||
.unwrap()
|
||||
.observe(delta);
|
||||
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
|
||||
} else {
|
||||
@@ -1209,6 +1212,35 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
|
||||
}
|
||||
|
||||
fn get_evictions_low_residence_duration_metric_threshold(
|
||||
tenant_conf: &TenantConfOpt,
|
||||
default_tenant_conf: &TenantConf,
|
||||
) -> Duration {
|
||||
tenant_conf
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
|
||||
// The threshold is embedded in the metric. So, we need to update it.
|
||||
{
|
||||
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&self.tenant_conf.read().unwrap(),
|
||||
&self.conf.default_tenant_conf,
|
||||
);
|
||||
let tenant_id_str = self.tenant_id.to_string();
|
||||
let timeline_id_str = self.timeline_id.to_string();
|
||||
self.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.write()
|
||||
.unwrap()
|
||||
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
|
||||
}
|
||||
}
|
||||
|
||||
/// Open a Timeline handle.
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory, but not the layer map.
|
||||
@@ -1240,6 +1272,11 @@ impl Timeline {
|
||||
let max_lsn_wal_lag = tenant_conf_guard
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
let evictions_low_residence_duration_metric_threshold =
|
||||
Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&tenant_conf_guard,
|
||||
&conf.default_tenant_conf,
|
||||
);
|
||||
drop(tenant_conf_guard);
|
||||
|
||||
Arc::new_cyclic(|myself| {
|
||||
@@ -1270,13 +1307,10 @@ impl Timeline {
|
||||
remote_client: remote_client.map(Arc::new),
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(
|
||||
RecordLsn {
|
||||
last: disk_consistent_lsn,
|
||||
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
|
||||
},
|
||||
(),
|
||||
),
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
last: disk_consistent_lsn,
|
||||
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
|
||||
}),
|
||||
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
|
||||
|
||||
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
|
||||
@@ -1290,7 +1324,7 @@ impl Timeline {
|
||||
&timeline_id,
|
||||
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
|
||||
"mtime",
|
||||
conf.evictions_low_residence_duration_metric_threshold,
|
||||
evictions_low_residence_duration_metric_threshold,
|
||||
),
|
||||
),
|
||||
|
||||
@@ -2423,7 +2457,7 @@ impl Timeline {
|
||||
assert!(new_lsn.is_aligned());
|
||||
|
||||
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
|
||||
self.last_record_lsn.advance(new_lsn, None);
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
|
||||
@@ -27,11 +27,13 @@ hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
native-tls.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres-native-tls.workspace = true
|
||||
pq_proto.workspace = true
|
||||
prometheus.workspace = true
|
||||
rand.workspace = true
|
||||
@@ -51,6 +53,7 @@ sync_wrapper.workspace = true
|
||||
thiserror.workspace = true
|
||||
tls-listener.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
tracing-opentelemetry.workspace = true
|
||||
@@ -69,4 +72,3 @@ tokio-util.workspace = true
|
||||
[dev-dependencies]
|
||||
rcgen.workspace = true
|
||||
rstest.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
|
||||
@@ -87,6 +87,20 @@ pub(super) async fn authenticate(
|
||||
.dbname(&db_info.dbname)
|
||||
.user(&db_info.user);
|
||||
|
||||
// That is a hack to support new way of accessing compute without using a
|
||||
// NodePort. Now to access compute in cross-k8s setup (console->compute
|
||||
// and link-proxy->compute) we need to connect to the pg_sni_router service
|
||||
// using a TLS. Destination compute address is encoded in domain/SNI.
|
||||
//
|
||||
// However, for link-proxy it is hard add support for outgoing TLS connections
|
||||
// as our trick with stealing stream from tokio-postgres doesn't work with TLS.
|
||||
// So set sni_host option and use unencrupted connection instead. Once we add
|
||||
// encryption support for outgoing connections to the proxy, we can remove
|
||||
// this hack.
|
||||
if db_info.host.contains("cluster.local") {
|
||||
config.options(format!("sni_host={}", db_info.host).as_str());
|
||||
}
|
||||
|
||||
if let Some(password) = db_info.password {
|
||||
config.password(password.as_ref());
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use pq_proto::StartupMessageParams;
|
||||
use std::{io, net::SocketAddr};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_postgres::NoTls;
|
||||
use tokio_postgres::{NoTls, config::SslMode, tls::MakeTlsConnect};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
@@ -19,6 +19,9 @@ pub enum ConnectionError {
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
CouldNotConnect(#[from] io::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
TlsError(#[from] native_tls::Error),
|
||||
}
|
||||
|
||||
impl UserFacingError for ConnectionError {
|
||||
@@ -198,6 +201,8 @@ impl ConnCfg {
|
||||
async fn do_connect(&self) -> Result<PostgresConnection, ConnectionError> {
|
||||
// TODO: establish a secure connection to the DB.
|
||||
let (socket_addr, mut stream) = self.connect_raw().await?;
|
||||
|
||||
|
||||
let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?;
|
||||
info!("connected to compute node at {socket_addr}");
|
||||
|
||||
|
||||
@@ -27,6 +27,8 @@ use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -206,6 +208,10 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
// Send keepalive messages to walproposer, to make sure it receives updates
|
||||
// even when it writes a steady stream of messages.
|
||||
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
|
||||
struct WalAcceptor {
|
||||
tli: Arc<Timeline>,
|
||||
@@ -253,18 +259,25 @@ impl WalAcceptor {
|
||||
timeline: Arc::clone(&self.tli),
|
||||
};
|
||||
|
||||
let mut next_msg: ProposerAcceptorMessage;
|
||||
// After this timestamp we will stop processing AppendRequests and send a response
|
||||
// to the walproposer. walproposer sends at least one AppendRequest per second,
|
||||
// we will send keepalives by replying to these requests once per second.
|
||||
let mut next_keepalive = Instant::now();
|
||||
|
||||
loop {
|
||||
let opt_msg = self.msg_rx.recv().await;
|
||||
if opt_msg.is_none() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
next_msg = opt_msg.unwrap();
|
||||
let mut next_msg = opt_msg.unwrap();
|
||||
|
||||
if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
// loop through AppendRequest's while it's readily available to
|
||||
// write as many WAL as possible without fsyncing
|
||||
//
|
||||
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
|
||||
// Otherwise, we might end up in a situation where we read a message, but don't
|
||||
// process it.
|
||||
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
|
||||
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
|
||||
@@ -274,6 +287,11 @@ impl WalAcceptor {
|
||||
}
|
||||
}
|
||||
|
||||
// get out of this loop if keepalive time is reached
|
||||
if Instant::now() >= next_keepalive {
|
||||
break;
|
||||
}
|
||||
|
||||
match self.msg_rx.try_recv() {
|
||||
Ok(msg) => next_msg = msg,
|
||||
Err(TryRecvError::Empty) => break,
|
||||
@@ -282,18 +300,18 @@ impl WalAcceptor {
|
||||
}
|
||||
|
||||
// flush all written WAL to the disk
|
||||
if let Some(reply) = self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
}
|
||||
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?
|
||||
} else {
|
||||
// process message other than AppendRequest
|
||||
if let Some(reply) = self.tli.process_msg(&next_msg)? {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
self.tli.process_msg(&next_msg)?
|
||||
};
|
||||
|
||||
if let Some(reply) = reply_msg {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
// reset keepalive time
|
||||
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
@@ -374,7 +373,7 @@ impl BrokerService for Broker {
|
||||
Ok(info) => yield info,
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
missed_msgs += skipped_msg;
|
||||
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
|
||||
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
|
||||
missed_msgs = 0;
|
||||
|
||||
@@ -45,6 +45,8 @@ PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
|
||||
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
"pageserver_remote_physical_size",
|
||||
"pageserver_remote_timeline_client_bytes_started_total",
|
||||
"pageserver_remote_timeline_client_bytes_finished_total",
|
||||
)
|
||||
|
||||
PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
@@ -53,6 +55,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_storage_operations_seconds_global_bucket",
|
||||
"libmetrics_launch_timestamp",
|
||||
"libmetrics_build_info",
|
||||
"libmetrics_tracing_event_count_total",
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
|
||||
@@ -1913,15 +1913,26 @@ def remote_pg(
|
||||
connstr = os.getenv("BENCHMARK_CONNSTR")
|
||||
if connstr is None:
|
||||
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
|
||||
|
||||
host = parse_dsn(connstr).get("host", "")
|
||||
is_neon = host.endswith(".neon.build")
|
||||
|
||||
start_ms = int(datetime.utcnow().timestamp() * 1000)
|
||||
with RemotePostgres(pg_bin, connstr) as remote_pg:
|
||||
if is_neon:
|
||||
timeline_id = TimelineId(remote_pg.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
|
||||
yield remote_pg
|
||||
|
||||
end_ms = int(datetime.utcnow().timestamp() * 1000)
|
||||
host = parse_dsn(connstr).get("host", "")
|
||||
if host.endswith(".neon.build"):
|
||||
if is_neon:
|
||||
# Add 10s margin to the start and end times
|
||||
allure_add_grafana_links(host, start_ms - 10_000, end_ms + 10_000)
|
||||
allure_add_grafana_links(
|
||||
host,
|
||||
timeline_id,
|
||||
start_ms - 10_000,
|
||||
end_ms + 10_000,
|
||||
)
|
||||
|
||||
|
||||
class PSQL:
|
||||
|
||||
@@ -519,6 +519,13 @@ class PageserverHttpClient(requests.Session):
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
def download_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
info = self.layer_map_info(tenant_id, timeline_id)
|
||||
for layer in info.historic_layers:
|
||||
if not layer.remote:
|
||||
continue
|
||||
self.download_layer(tenant_id, timeline_id, layer.layer_file_name)
|
||||
|
||||
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
|
||||
res = self.delete(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
|
||||
@@ -543,3 +550,13 @@ class PageserverHttpClient(requests.Session):
|
||||
def tenant_break(self, tenant_id: TenantId):
|
||||
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
|
||||
self.verbose_error(res)
|
||||
|
||||
def post_tracing_event(self, level: str, message: str):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tracing/event",
|
||||
json={
|
||||
"level": level,
|
||||
"message": message,
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -13,6 +13,7 @@ import allure
|
||||
from psycopg2.extensions import cursor
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.types import TimelineId
|
||||
|
||||
Fn = TypeVar("Fn", bound=Callable[..., Any])
|
||||
|
||||
@@ -186,11 +187,15 @@ def allure_attach_from_dir(dir: Path):
|
||||
allure.attach.file(source, name, attachment_type, extension)
|
||||
|
||||
|
||||
DATASOURCE_ID = "xHHYY0dVz"
|
||||
GRAFANA_URL = "https://neonprod.grafana.net"
|
||||
GRAFANA_EXPLORE_URL = f"{GRAFANA_URL}/explore"
|
||||
GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL = f"{GRAFANA_URL}/d/8G011dlnk/timeline-inspector"
|
||||
LOGS_STAGING_DATASOURCE_ID = "xHHYY0dVz"
|
||||
|
||||
|
||||
def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
|
||||
def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int, end_ms: int):
|
||||
"""Add links to server logs in Grafana to Allure report"""
|
||||
links = {}
|
||||
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
|
||||
endpoint_id, region_id, _ = host.split(".", 2)
|
||||
|
||||
@@ -202,12 +207,12 @@ def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
|
||||
}
|
||||
|
||||
params: Dict[str, Any] = {
|
||||
"datasource": DATASOURCE_ID,
|
||||
"datasource": LOGS_STAGING_DATASOURCE_ID,
|
||||
"queries": [
|
||||
{
|
||||
"expr": "<PUT AN EXPRESSION HERE>",
|
||||
"refId": "A",
|
||||
"datasource": {"type": "loki", "uid": DATASOURCE_ID},
|
||||
"datasource": {"type": "loki", "uid": LOGS_STAGING_DATASOURCE_ID},
|
||||
"editorMode": "code",
|
||||
"queryType": "range",
|
||||
}
|
||||
@@ -220,8 +225,23 @@ def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
|
||||
for name, expr in expressions.items():
|
||||
params["queries"][0]["expr"] = expr
|
||||
query_string = urlencode({"orgId": 1, "left": json.dumps(params)})
|
||||
link = f"https://neonprod.grafana.net/explore?{query_string}"
|
||||
links[name] = f"{GRAFANA_EXPLORE_URL}?{query_string}"
|
||||
|
||||
timeline_qs = urlencode(
|
||||
{
|
||||
"orgId": 1,
|
||||
"var-environment": "victoria-metrics-aws-dev",
|
||||
"var-timeline_id": timeline_id,
|
||||
"var-endpoint_id": endpoint_id,
|
||||
"var-log_datasource": "grafanacloud-neonstaging-logs",
|
||||
"from": start_ms,
|
||||
"to": end_ms,
|
||||
}
|
||||
)
|
||||
link = f"{GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL}?{timeline_qs}"
|
||||
links["Timeline Inspector"] = link
|
||||
|
||||
for name, link in links.items():
|
||||
allure.dynamic.link(link, name=name)
|
||||
log.info(f"{name}: {link}")
|
||||
|
||||
|
||||
49
test_runner/regress/test_logging.py
Normal file
49
test_runner/regress/test_logging.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "error"])
|
||||
def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str):
|
||||
# self-test: make sure the event is logged (i.e., our testing endpoint works)
|
||||
log_expected = {
|
||||
"trace": False,
|
||||
"debug": False,
|
||||
"info": True,
|
||||
"warn": True,
|
||||
"error": True,
|
||||
}[level]
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
msg_id = uuid.uuid4().hex
|
||||
|
||||
# NB: the _total suffix is added by our prometheus client
|
||||
before = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
|
||||
|
||||
# post the event
|
||||
ps_http.post_tracing_event(level, msg_id)
|
||||
if log_expected:
|
||||
env.pageserver.allowed_errors.append(f".*{msg_id}.*")
|
||||
|
||||
def assert_logged():
|
||||
if not log_expected:
|
||||
return
|
||||
assert env.pageserver.log_contains(f".*{msg_id}.*")
|
||||
|
||||
wait_until(10, 0.5, assert_logged)
|
||||
|
||||
# make sure it's counted
|
||||
def assert_metric_value():
|
||||
if not log_expected:
|
||||
return
|
||||
# NB: the _total suffix is added by our prometheus client
|
||||
val = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
|
||||
val = val or 0.0
|
||||
log.info("libmetrics_tracing_event_count: %s", val)
|
||||
assert val > (before or 0.0)
|
||||
|
||||
wait_until(10, 1, assert_metric_value)
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2.extras
|
||||
@@ -18,9 +19,16 @@ def test_tenant_config(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.pageserver_config_override = """
|
||||
page_cache_size=444;
|
||||
wait_lsn_timeout='111 s';
|
||||
tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
[tenant_config]
|
||||
checkpoint_distance = 10000
|
||||
compaction_target_size = 1048576
|
||||
evictions_low_residence_duration_metric_threshold = "2 days"
|
||||
eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = "23 hours" }
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
# we configure eviction but no remote storage, there might be error lines
|
||||
env.pageserver.allowed_errors.append(".* no remote storage configured, cannot evict layers .*")
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
# Check that we raise on misspelled configs
|
||||
@@ -39,6 +47,8 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
new_conf = {
|
||||
"checkpoint_distance": "20000",
|
||||
"gc_period": "30sec",
|
||||
"evictions_low_residence_duration_metric_threshold": "42s",
|
||||
"eviction_policy": json.dumps({"kind": "NoEviction"}),
|
||||
}
|
||||
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
|
||||
|
||||
@@ -78,6 +88,12 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
assert effective_config["gc_period"] == "1h"
|
||||
assert effective_config["image_creation_threshold"] == 3
|
||||
assert effective_config["pitr_interval"] == "7days"
|
||||
assert effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
|
||||
assert effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "20s",
|
||||
"threshold": "23h",
|
||||
}
|
||||
|
||||
# check the configuration of the new tenant
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
@@ -112,6 +128,12 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
assert (
|
||||
new_effective_config["gc_period"] == "30s"
|
||||
), "Specific 'gc_period' config should override the default value"
|
||||
assert (
|
||||
new_effective_config["evictions_low_residence_duration_metric_threshold"] == "42s"
|
||||
), "Should override default value"
|
||||
assert new_effective_config["eviction_policy"] == {
|
||||
"kind": "NoEviction"
|
||||
}, "Specific 'eviction_policy' config should override the default value"
|
||||
assert new_effective_config["compaction_target_size"] == 1048576
|
||||
assert new_effective_config["compaction_period"] == "20s"
|
||||
assert new_effective_config["compaction_threshold"] == 10
|
||||
@@ -125,6 +147,10 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
"gc_period": "80sec",
|
||||
"compaction_period": "80sec",
|
||||
"image_creation_threshold": "2",
|
||||
"evictions_low_residence_duration_metric_threshold": "23h",
|
||||
"eviction_policy": json.dumps(
|
||||
{"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"}
|
||||
),
|
||||
}
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id=tenant,
|
||||
@@ -167,6 +193,14 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
assert (
|
||||
updated_effective_config["compaction_period"] == "1m 20s"
|
||||
), "Specific 'compaction_period' config should override the default value"
|
||||
assert (
|
||||
updated_effective_config["evictions_low_residence_duration_metric_threshold"] == "23h"
|
||||
), "Should override default value"
|
||||
assert updated_effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "1m 20s",
|
||||
"threshold": "1day 18h",
|
||||
}, "Specific 'eviction_policy' config should override the default value"
|
||||
assert updated_effective_config["compaction_target_size"] == 1048576
|
||||
assert updated_effective_config["compaction_threshold"] == 10
|
||||
assert updated_effective_config["gc_horizon"] == 67108864
|
||||
@@ -225,6 +259,12 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
assert final_effective_config["gc_horizon"] == 67108864
|
||||
assert final_effective_config["gc_period"] == "1h"
|
||||
assert final_effective_config["image_creation_threshold"] == 3
|
||||
assert final_effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
|
||||
assert final_effective_config["eviction_policy"] == {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "20s",
|
||||
"threshold": "23h",
|
||||
}
|
||||
|
||||
# restart the pageserver and ensure that the config is still correct
|
||||
env.pageserver.stop()
|
||||
@@ -285,3 +325,81 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
|
||||
# dont test applying the setting here, we have that another test case to show it
|
||||
# we just care about being able to create the file
|
||||
assert len(contents_first) > len(contents_later)
|
||||
|
||||
|
||||
def test_live_reconfig_get_evictions_low_residence_duration_metric_threshold(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
|
||||
test_name="test_live_reconfig_get_evictions_low_residence_duration_metric_threshold",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
|
||||
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
def get_metric():
|
||||
metrics = ps_http.get_metrics()
|
||||
metric = metrics.query_one(
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
{
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
)
|
||||
return metric
|
||||
|
||||
default_value = ps_http.tenant_config(tenant_id).effective_config[
|
||||
"evictions_low_residence_duration_metric_threshold"
|
||||
]
|
||||
metric = get_metric()
|
||||
assert int(metric.value) == 0, "metric is present with default value"
|
||||
|
||||
assert default_value == "1day"
|
||||
|
||||
ps_http.download_all_layers(tenant_id, timeline_id)
|
||||
ps_http.evict_all_layers(tenant_id, timeline_id)
|
||||
metric = get_metric()
|
||||
assert int(metric.value) > 0, "metric is updated"
|
||||
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id, {"evictions_low_residence_duration_metric_threshold": default_value}
|
||||
)
|
||||
updated_metric = get_metric()
|
||||
assert int(updated_metric.value) == int(
|
||||
metric.value
|
||||
), "metric is unchanged when setting same value"
|
||||
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id, {"evictions_low_residence_duration_metric_threshold": "2day"}
|
||||
)
|
||||
metric = get_metric()
|
||||
assert int(metric.labels["low_threshold_secs"]) == 2 * 24 * 60 * 60
|
||||
assert int(metric.value) == 0
|
||||
|
||||
ps_http.download_all_layers(tenant_id, timeline_id)
|
||||
ps_http.evict_all_layers(tenant_id, timeline_id)
|
||||
metric = get_metric()
|
||||
assert int(metric.labels["low_threshold_secs"]) == 2 * 24 * 60 * 60
|
||||
assert int(metric.value) > 0
|
||||
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id, {"evictions_low_residence_duration_metric_threshold": "2h"}
|
||||
)
|
||||
metric = get_metric()
|
||||
assert int(metric.labels["low_threshold_secs"]) == 2 * 60 * 60
|
||||
assert int(metric.value) == 0, "value resets if label changes"
|
||||
|
||||
ps_http.download_all_layers(tenant_id, timeline_id)
|
||||
ps_http.evict_all_layers(tenant_id, timeline_id)
|
||||
metric = get_metric()
|
||||
assert int(metric.labels["low_threshold_secs"]) == 2 * 60 * 60
|
||||
assert int(metric.value) > 0, "set a non-zero value for next step"
|
||||
|
||||
env.neon_cli.config_tenant(tenant_id, {})
|
||||
metric = get_metric()
|
||||
assert int(metric.labels["low_threshold_secs"]) == 24 * 60 * 60, "label resets to default"
|
||||
assert int(metric.value) == 0, "value resets to default"
|
||||
|
||||
@@ -4,8 +4,6 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
clap.workspace = true
|
||||
anyhow.workspace = true
|
||||
|
||||
@@ -18,6 +18,7 @@ byteorder = { version = "1" }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
digest = { version = "0.10", features = ["mac", "std"] }
|
||||
either = { version = "1" }
|
||||
@@ -29,7 +30,6 @@ futures-executor = { version = "0.3" }
|
||||
futures-sink = { version = "0.3" }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
hashbrown = { version = "0.12", features = ["raw"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
@@ -52,7 +52,8 @@ socket2 = { version = "0.4", default-features = false, features = ["all"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "sync", "time"] }
|
||||
tokio-rustls = { version = "0.23" }
|
||||
tokio-util = { version = "0.7", features = ["codec", "io"] }
|
||||
tonic = { version = "0.8", features = ["tls-roots"] }
|
||||
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
|
||||
toml_edit = { version = "0.19", features = ["serde"] }
|
||||
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
@@ -64,7 +65,6 @@ anyhow = { version = "1", features = ["backtrace"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
either = { version = "1" }
|
||||
hashbrown = { version = "0.12", features = ["raw"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
@@ -74,6 +74,7 @@ prost = { version = "0.11" }
|
||||
regex = { version = "1" }
|
||||
regex-syntax = { version = "0.6" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
|
||||
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
|
||||
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }
|
||||
|
||||
### END HAKARI SECTION
|
||||
|
||||
Reference in New Issue
Block a user