Compare commits

..

20 Commits

Author SHA1 Message Date
Christian Schwarz
81d715b187 another rename 2023-06-07 16:27:53 +02:00
Christian Schwarz
0afd20068b title 2023-06-07 16:27:53 +02:00
Christian Schwarz
f3d7bf9e09 rename the crate and fix some compile errors that I missed a while back 2023-06-07 16:27:53 +02:00
Christian Schwarz
748c06cff8 docs improvements 2023-06-07 16:27:53 +02:00
Christian Schwarz
0d82862d55 generic GetReconstructPathError 2023-06-07 16:27:53 +02:00
Christian Schwarz
f2bd71d0a8 more docs 2023-06-07 16:27:53 +02:00
Christian Schwarz
de9521214d all-in on immutable + better crate comment 2023-06-07 16:27:53 +02:00
Christian Schwarz
8d9207040f WIP doc comments (more aspirational than reality) 2023-06-07 16:27:53 +02:00
Christian Schwarz
8e57d95026 completely immutable variant of the design 2023-06-07 16:27:53 +02:00
Christian Schwarz
6c71fc6646 struct type for in-memory layer put failure 2023-06-07 16:27:53 +02:00
Christian Schwarz
e6a36b5236 unused PutError accessors 2023-06-07 16:27:53 +02:00
Christian Schwarz
56f57172dd move code around to prep for alternative impl 2023-06-07 16:27:53 +02:00
Christian Schwarz
74ad719ede fix most unused variable warnings 2023-06-07 16:27:53 +02:00
Christian Schwarz
5570384672 WIP 2023-06-07 16:27:53 +02:00
Christian Schwarz
321e74b5ee implement support for non-blocking SeqWait::wait_for_timeout and use it 2023-06-07 16:27:53 +02:00
Christian Schwarz
712a516a2f switch to a Types trait to declutter the generics 2023-06-07 16:27:53 +02:00
Christian Schwarz
be5ba04dca rename trait to HistoricStuff 2023-06-07 16:27:53 +02:00
Christian Schwarz
b4b1292e15 WIP version of fully generic layer map built atop the new seqwait 2023-06-07 16:27:53 +02:00
Christian Schwarz
9b992c621d seqwait: handle wait_for_timeout() with a zero duration efficiently 2023-06-07 16:27:49 +02:00
Christian Schwarz
1fa17ed486 seqwait: add support for communicating immutable data between advance and wait
The long-term plan is to make LayerMap an immutable data structure that is
multi-versioned. Meaning, we will not modify LayerMap in place but create
a (cheap) copy and modify that copy. Once we're done making modifications,
we make the copy available to readers through the SeqWait.

The modifications will be made by a _single_ task, the pageserver actor.
But _many_ readers can wait_for & use same or multiple versions of the LayerMap.

So, there's a new method `split_spmc` that splits up a `SeqWait` into a
not-clonable producer (Advance) and a clonable consumer (Wait).

(SeqWait itself is mpmc, but, for the actor architecture, it makes sense
 to enforce spmc in the type system)

 # Please enter the commit message for your changes. Lines starting
2023-06-07 16:26:48 +02:00
69 changed files with 1669 additions and 2566 deletions

View File

@@ -4,7 +4,7 @@
hakari-package = "workspace_hack"
# Format for `workspace-hack = ...` lines in other Cargo.tomls. Requires cargo-hakari 0.9.8 or above.
dep-format-version = "4"
dep-format-version = "3"
# Setting workspace.resolver = "2" in the root Cargo.toml is HIGHLY recommended.
# Hakari works much better with the new feature resolver.

View File

@@ -126,8 +126,8 @@
shell:
cmd: |
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
curl -sfS {{ console_mgmt_base_url }}/management/api/v2/pageservers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
curl -sfS -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/pageservers
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/pageservers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/pageservers
tags:
- pageserver
@@ -205,7 +205,7 @@
shell:
cmd: |
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
curl -sfS {{ console_mgmt_base_url }}/management/api/v2/safekeepers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
curl -sfS -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/safekeepers
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/safekeepers/$INSTANCE_ID | jq '.version = {{ current_version }}' > /tmp/new_version
curl -sfS -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" -X POST -d@/tmp/new_version {{ console_mgmt_base_url }}/management/api/v2/safekeepers
tags:
- safekeeper

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -1,50 +0,0 @@
storage:
vars:
bucket_name: neon-prod-storage-us-east-1
bucket_region: us-east-1
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
broker_endpoint: http://storage-broker-lb.theta.us-east-1.internal.aws.neon.tech:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-east-1
ansible_aws_ssm_bucket_name: neon-prod-storage-us-east-1
console_region_id: aws-us-east-1
sentry_environment: production
children:
pageservers:
hosts:
pageserver-0.us-east-1.aws.neon.tech:
ansible_host: i-085222088b0d2e0c7
pageserver-1.us-east-1.aws.neon.tech:
ansible_host: i-0969d4f684d23a21e
pageserver-2.us-east-1.aws.neon.tech:
ansible_host: i-05dee87895da58dad
safekeepers:
hosts:
safekeeper-0.us-east-1.aws.neon.tech:
ansible_host: i-04ce739e88793d864
safekeeper-1.us-east-1.aws.neon.tech:
ansible_host: i-0e9e6c9227fb81410
safekeeper-2.us-east-1.aws.neon.tech:
ansible_host: i-072f4dd86a327d52f

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
@@ -34,7 +34,7 @@ storage:
pageservers:
hosts:
pageserver-0.us-west-2.aws.neon.tech:
ansible_host: i-0d9f6dfae0e1c780d
ansible_host: i-0d9f6dfae0e1c780d
pageserver-1.us-west-2.aws.neon.tech:
ansible_host: i-0c834be1dddba8b3f
pageserver-2.us-west-2.aws.neon.tech:
@@ -49,5 +49,5 @@ storage:
safekeeper-1.us-west-2.aws.neon.tech:
ansible_host: i-074682f9d3c712e7c
safekeeper-2.us-west-2.aws.neon.tech:
ansible_host: i-042b7efb1729d7966
ansible_host: i-042b7efb1729d7966

View File

@@ -27,10 +27,10 @@ cat <<EOF | tee /tmp/payload
EOF
# check if pageserver already registered or not
if ! curl -sf {{ console_mgmt_base_url }}/management/api/v2/pageservers/${INSTANCE_ID} -o /dev/null; then
if ! curl -sf -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/pageservers/${INSTANCE_ID} -o /dev/null; then
# not registered, so register it now
ID=$(curl -sf -X POST -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/pageservers -d@/tmp/payload | jq -r '.id')
ID=$(curl -sf -X POST -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/pageservers -d@/tmp/payload | jq -r '.id')
# init pageserver
sudo -u pageserver /usr/local/bin/pageserver -c "id=${ID}" -c "pg_distrib_dir='/usr/local'" --init -D /storage/pageserver/data

View File

@@ -22,10 +22,10 @@ cat <<EOF | tee /tmp/payload
EOF
# check if safekeeper already registered or not
if ! curl -sf {{ console_mgmt_base_url }}/management/api/v2/safekeepers/${INSTANCE_ID} -o /dev/null; then
if ! curl -sf -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" {{ console_mgmt_base_url }}/management/api/v2/safekeepers/${INSTANCE_ID} -o /dev/null; then
# not registered, so register it now
ID=$(curl -sf -X POST -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/safekeepers -d@/tmp/payload | jq -r '.id')
ID=$(curl -sf -X POST -H "Authorization: Bearer {{ CONSOLE_API_TOKEN }}" -H "Content-Type: application/json" {{ console_mgmt_base_url }}/management/api/v2/safekeepers -d@/tmp/payload | jq -r '.id')
# init safekeeper
sudo -u safekeeper /usr/local/bin/safekeeper --id ${ID} --init -D /storage/safekeeper/data
fi

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:

View File

@@ -1,22 +1,6 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -7,16 +7,15 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -1,69 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-1.aws.neon.tech"
# These domains haven't been delegated yet.
# extraDomains: ["*.us-east-1.retooldb.com", "*.us-east-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-east-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-east-1.aws.neon.tech
httpsPort: 443
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.theta.us-east-1.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -938,7 +938,7 @@ jobs:
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder

View File

@@ -67,7 +67,7 @@ jobs:
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder

View File

@@ -49,7 +49,7 @@ jobs:
shell: bash
strategy:
matrix:
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1, us-east-1 ]
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1 ]
environment:
name: prod-${{ matrix.target_region }}
steps:
@@ -68,7 +68,7 @@ jobs:
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
deploy-proxy-prod-new:
@@ -97,10 +97,6 @@ jobs:
target_cluster: prod-ap-southeast-1-epsilon
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
- target_region: us-east-1
target_cluster: prod-us-east-1-theta
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: prod-${{ matrix.target_region }}
steps:
@@ -151,8 +147,6 @@ jobs:
target_cluster: prod-eu-central-1-gamma
- target_region: ap-southeast-1
target_cluster: prod-ap-southeast-1-epsilon
- target_region: us-east-1
target_cluster: prod-us-east-1-theta
environment:
name: prod-${{ matrix.target_region }}
steps:

1413
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,10 +24,10 @@ atty = "0.2.14"
aws-config = { version = "0.51.0", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.21.0"
aws-smithy-http = "0.51.0"
aws-types = "0.55"
aws-types = "0.51.0"
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.65"
bindgen = "0.61"
bstr = "1.0"
byteorder = "1.4"
bytes = "1.0"
@@ -50,7 +50,7 @@ git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hex = "0.4"
hex-literal = "0.4"
hex-literal = "0.3"
hmac = "0.12.1"
hostname = "0.3.1"
humantime = "2.1"
@@ -80,18 +80,18 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
routerify = "3"
rpds = "0.13"
rpds = "0.12.0"
rustls = "0.20"
rustls-pemfile = "1"
rustls-split = "0.3"
scopeguard = "1.1"
sentry = { version = "0.30", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
sentry = { version = "0.29", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
sha2 = "0.10.2"
signal-hook = "0.3"
socket2 = "0.5"
socket2 = "0.4.4"
strum = "0.24"
strum_macros = "0.24"
svg_fmt = "0.4.1"
@@ -106,17 +106,17 @@ tokio-postgres-rustls = "0.9.0"
tokio-rustls = "0.23"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"] }
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
toml = "0.5"
toml_edit = { version = "0.17", features = ["easy"] }
tonic = {version = "0.8", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2"
uuid = { version = "1.2", features = ["v4", "serde"] }
walkdir = "2.3.2"
webpki-roots = "0.23"
x509-parser = "0.15"
webpki-roots = "0.22.5"
x509-parser = "0.14"
## TODO replace this with tracing
env_logger = "0.10"
@@ -154,9 +154,9 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
## Build dependencies
criterion = "0.4"
rcgen = "0.10"
rstest = "0.17"
rstest = "0.16"
tempfile = "3.4"
tonic-build = "0.9"
tonic-build = "0.8"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.

View File

@@ -12,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev
libicu-dev libxslt1-dev
#########################################################################################
#
@@ -24,13 +24,8 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION} postgres
RUN cd postgres && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION}" != "v14" ]; then \
# zstd is available only from PG15
export CONFIGURE_CMD="${CONFIGURE_CMD} --with-zstd"; \
fi && \
eval $CONFIGURE_CMD && \
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
--with-libxml --with-libxslt && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
@@ -570,17 +565,13 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)
# liblz4-1 for lz4
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
# libzstd1 for zstd
RUN apt update && \
apt install --no-install-recommends -y \
gdb \
locales \
libicu67 \
liblz4-1 \
libreadline8 \
libossp-uuid16 \
libgeos-c1v5 \
@@ -590,8 +581,7 @@ RUN apt update && \
libsfcgal1 \
libxml2 \
libxslt1.1 \
libzstd1 \
procps && \
gdb && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -54,7 +54,7 @@ RUN set -e \
RUN set -e \
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
&& CONNSTR="dbname=postgres user=cloud_admin sslmode=disable" \
&& CONNSTR="dbname=neondb user=cloud_admin sslmode=disable" \
&& ARGS="--auto-restart --cgroup=neon-postgres --pgconnstr=\"$CONNSTR\"" \
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant $ARGS'" >> /etc/inittab

View File

@@ -73,7 +73,7 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let spec;
let mut spec = None;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
@@ -89,13 +89,9 @@ fn main() -> Result<()> {
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
live_config_allowed = true;
spec = match get_spec_from_control_plane(cp_base, id) {
Ok(s) => s,
Err(e) => {
error!("cannot get response from control plane: {}", e);
panic!("neither spec nor confirmation that compute is in the Empty state was received");
}
};
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = Some(s);
}
} else {
panic!("must specify both --control-plane-uri and --compute-id or none");
}
@@ -118,6 +114,7 @@ fn main() -> Result<()> {
spec_set = false;
}
let compute_node = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
@@ -150,17 +147,6 @@ fn main() -> Result<()> {
let mut state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().expect("spec must be set");
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
// Record for how long we slept waiting for the spec.
state.metrics.wait_for_spec_ms = Utc::now()
.signed_duration_since(state.start_time)
.to_std()
.unwrap()
.as_millis() as u64;
// Reset start time to the actual start of the configuration, so that
// total startup time was properly measured at the end.
state.start_time = Utc::now();
state.status = ComputeStatus::Init;
compute.state_changed.notify_all();
drop(state);

View File

@@ -1,28 +1,12 @@
use anyhow::{anyhow, Result};
use postgres::Client;
use tokio_postgres::NoTls;
use tracing::{error, instrument};
use crate::compute::ComputeNode;
/// Update timestamp in a row in a special service table to check
/// that we can actually write some data in this particular timeline.
/// Create table if it's missing.
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
// Connect to the database.
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
let query = "
CREATE TABLE IF NOT EXISTS health_check (
id serial primary key,
@@ -31,15 +15,31 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = now();";
let result = client.simple_query(query).await?;
if result.len() != 2 {
return Err(anyhow::format_err!(
"expected 2 query results, but got {}",
result.len()
));
let result = client.simple_query(query)?;
if result.len() < 2 {
return Err(anyhow::format_err!("executed {} queries", result.len()));
}
Ok(())
}
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
let result = client
.simple_query("UPDATE health_check SET updated_at = now() WHERE id = 1;")
.await?;
if result.len() != 1 {
return Err(anyhow!("statement can't be executed"));
}
Ok(())
}

View File

@@ -32,12 +32,14 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
use crate::checker::create_writability_check_data;
use crate::config;
use crate::pg_helpers::*;
use crate::spec::*;
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
pub start_time: DateTime<Utc>,
// Url type maintains proper escaping
pub connstr: url::Url,
pub pgdata: String,
@@ -65,7 +67,6 @@ pub struct ComputeNode {
#[derive(Clone, Debug)]
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
@@ -77,7 +78,6 @@ pub struct ComputeState {
impl ComputeState {
pub fn new() -> Self {
Self {
start_time: Utc::now(),
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
@@ -342,6 +342,7 @@ impl ComputeNode {
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(spec, &mut client)?;
// 'Close' connection
@@ -426,7 +427,7 @@ impl ComputeNode {
.unwrap()
.as_millis() as u64;
state.metrics.total_startup_ms = startup_end_time
.signed_duration_since(compute_state.start_time)
.signed_duration_since(self.start_time)
.to_std()
.unwrap()
.as_millis() as u64;

View File

@@ -18,7 +18,6 @@ use tracing_utils::http::OtelName;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
start_time: state.start_time,
tenant: state
.pspec
.as_ref()
@@ -86,10 +85,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
let res = crate::checker::check_writability(compute).await;
match res {
Ok(_) => Response::new(Body::from("true")),
Err(e) => {
error!("check_writability failed: {}", e);
Response::new(Body::from(e.to_string()))
}
Err(e) => Response::new(Body::from(e.to_string())),
}
}

View File

@@ -152,14 +152,11 @@ components:
type: object
description: Compute startup metrics.
required:
- wait_for_spec_ms
- sync_safekeepers_ms
- basebackup_ms
- config_ms
- total_startup_ms
properties:
wait_for_spec_ms:
type: integer
sync_safekeepers_ms:
type: integer
basebackup_ms:
@@ -184,13 +181,6 @@ components:
- status
- last_active
properties:
start_time:
type: string
description: |
Time when compute was started. If initially compute was started in the `empty`
state and then provided with valid spec, `start_time` will be reset to the
moment, when spec was received.
example: "2022-10-12T07:20:50.52Z"
status:
$ref: '#/components/schemas/ComputeStatus'
last_active:

View File

@@ -4,117 +4,42 @@ use std::str::FromStr;
use anyhow::{anyhow, bail, Result};
use postgres::config::Config;
use postgres::{Client, NoTls};
use reqwest::StatusCode;
use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
use compute_api::responses::ControlPlaneSpecResponse;
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
// Do control plane request and return response if any. In case of error it
// returns a bool flag indicating whether it makes sense to retry the request
// and a string with error message.
fn do_control_plane_request(
uri: &str,
jwt: &str,
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", jwt)
.send()
.map_err(|e| {
(
true,
format!("could not perform spec request to control plane: {}", e),
)
})?;
match resp.status() {
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
format!("could not deserialize control plane response: {}", e),
)),
},
StatusCode::SERVICE_UNAVAILABLE => {
Err((true, "control plane is temporarily unavailable".to_string()))
}
StatusCode::BAD_GATEWAY => {
// We have a problem with intermittent 502 errors now
// https://github.com/neondatabase/cloud/issues/2353
// It's fine to retry GET request in this case.
Err((true, "control plane request failed with 502".to_string()))
}
// Another code, likely 500 or 404, means that compute is unknown to the control plane
// or some internal failure happened. Doesn't make much sense to retry in this case.
_ => Err((
false,
format!(
"unexpected control plane response status code: {}",
resp.status()
),
)),
}
}
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(
base_uri: &str,
compute_id: &str,
) -> Result<Option<ComputeSpec>> {
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
let mut attempt = 1;
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
info!("getting spec from control plane: {}", cp_uri);
// Do 3 attempts to get spec from the control plane using the following logic:
// - network error -> then retry
// - compute id is unknown or any other error -> bail out
// - no spec for compute yet (Empty state) -> return Ok(None)
// - got spec -> return Ok(Some(spec))
while attempt < 4 {
spec = match do_control_plane_request(&cp_uri, &jwt) {
Ok(spec_resp) => match spec_resp.status {
ControlPlaneComputeStatus::Empty => Ok(None),
ControlPlaneComputeStatus::Attached => {
if let Some(spec) = spec_resp.spec {
Ok(Some(spec))
} else {
bail!("compute is attached, but spec is empty")
}
}
},
Err((retry, msg)) => {
if retry {
Err(anyhow!(msg))
} else {
bail!(msg);
}
}
};
// TODO: check the response. We should distinguish cases when it's
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
.json()
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
if let Err(e) = &spec {
error!("attempt {} to get spec failed with: {}", attempt, e);
} else {
return spec;
}
attempt += 1;
std::thread::sleep(std::time::Duration::from_millis(100));
if let Some(spec) = resp.spec {
Ok(spec)
} else {
bail!("could not get compute spec from control plane")
}
// All attempts failed, return error.
spec
}
/// It takes cluster specification and does the following:

View File

@@ -359,8 +359,8 @@ impl PageServerNode {
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.remove("eviction_policy")
.map(serde_json::from_str)
.get("eviction_policy")
.map(|x| serde_json::from_str(x))
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
min_resident_size_override: settings
@@ -368,9 +368,6 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -448,9 +445,6 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as an integer")?,
evictions_low_residence_duration_metric_threshold: settings
.get("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
})
.send()?
.error_from_body()?;

View File

@@ -14,7 +14,6 @@ pub struct GenericAPIError {
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct ComputeStatusResponse {
pub start_time: DateTime<Utc>,
pub tenant: Option<String>,
pub timeline: Option<String>,
pub status: ComputeStatus,
@@ -64,7 +63,6 @@ where
/// Response of the /metrics.json API
#[derive(Clone, Debug, Default, Serialize)]
pub struct ComputeMetrics {
pub wait_for_spec_ms: u64,
pub sync_safekeepers_ms: u64,
pub basebackup_ms: u64,
pub config_ms: u64,
@@ -77,16 +75,4 @@ pub struct ComputeMetrics {
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {
pub spec: Option<ComputeSpec>,
pub status: ControlPlaneComputeStatus,
}
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ControlPlaneComputeStatus {
// Compute is known to control-plane, but it's not
// yet attached to any timeline / endpoint.
Empty,
// Compute is attached to some timeline / endpoint and
// should be able to start with provided spec.
Attached,
}

View File

@@ -4,12 +4,13 @@ version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
anyhow.workspace = true
chrono.workspace = true
rand.workspace = true
serde.workspace = true
serde_with.workspace = true
utils.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
workspace_hack.workspace = true
[dependencies]
anyhow = "1.0.68"
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
rand = "0.8.3"
serde = "1.0.152"
serde_with = "2.1.0"
utils = { version = "0.1.0", path = "../utils" }
workspace_hack = { version = "0.1.0", path = "../../workspace_hack" }

View File

@@ -135,7 +135,6 @@ pub struct TenantCreateRequest {
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
#[serde_as]
@@ -182,7 +181,6 @@ pub struct TenantConfigRequest {
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
impl TenantConfigRequest {
@@ -204,7 +202,6 @@ impl TenantConfigRequest {
trace_read_requests: None,
eviction_policy: None,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
}
}
}

View File

@@ -5,7 +5,7 @@ use std::path::PathBuf;
use std::process::Command;
use anyhow::{anyhow, Context};
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
use bindgen::callbacks::ParseCallbacks;
#[derive(Debug)]
struct PostgresFfiCallbacks;
@@ -20,7 +20,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
// Add any custom #[derive] attributes to the data structures that bindgen
// creates.
fn add_derives(&self, derive_info: &DeriveInfo) -> Vec<String> {
fn add_derives(&self, name: &str) -> Vec<String> {
// This is the list of data structures that we want to serialize/deserialize.
let serde_list = [
"XLogRecord",
@@ -31,7 +31,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
"ControlFileData",
];
if serde_list.contains(&derive_info.name) {
if serde_list.contains(&name) {
vec![
"Default".into(), // Default allows us to easily fill the padding fields with 0.
"Serialize".into(),

View File

@@ -204,7 +204,12 @@ async fn upload_s3_data(
let data = format!("remote blob data {i}").into_bytes();
let data_len = data.len();
task_client
.upload(std::io::Cursor::new(data), data_len, &blob_path, None)
.upload(
Box::new(std::io::Cursor::new(data)),
data_len,
&blob_path,
None,
)
.await?;
Ok::<_, anyhow::Error>((blob_prefix, blob_path))

View File

@@ -0,0 +1,13 @@
[package]
name = "timeline_data_path"
version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
utils.workspace = true
workspace_hack.workspace = true
tokio.workspace = true
thiserror.workspace = true

View File

@@ -0,0 +1,396 @@
//! The Timeline's core data path.
//!
//! # Overview
//!
//! This crate implements the core data path of a Timeline inside Pageserver:
//!
//! 1. WAL records from `walreceiver`, via in-memory layers, into persistent L0 layers.
//! 1. `GetPage@LSN`: retrieval of WAL records and page images for feeding into WAL redo.
//! 1. Data re-shuffeling through compaction (TODO).
//! 1. Page image creation & garbage collection through GC (TODO).
//!
//! This crate assumes the following concepts, but is fully generic over their implementation:
//!
//! - **Delta Records**: data is written into the system in the form of self-descriptive deltas.
//! For the Pageserver use case, these deltas are derived from Postgres WAL records.
//! - **Page Numbers**: Delta Records always affect a single key.
//! That key is called page number, because, in the Pageserver use case, the Postgres table page numbers are the keys.
//! - **LSN**: When writing Delta Records into the system, they are associated with a monotonically increasing LSN.
//! Subsequently written Delta Records must have increasing LSNs.
//! - **Page Images**: Delta Records for a given page can be used to reconstruct the page. Think of it like squashing diffs.
//! - When sorting the Delta Records for a given key by their LSN, any prefix of that sorting can be squashed into a page image.
//! - Delta Records following such a squash can be squashed into that page image.
//! - In Pageserver, WAL redo implements the (pure) function of squashing.
//! - **In-Memory Layer**: an object that represents an "unfinished" L0 layer file, holding Delta Records in insertion order.
//! "Unfinished" means that we're still writing Delta Records to that file.
//! - **Historic Layer**: an object that represents a "finished" layer file, at any compaction level.
//! Such objects reside on disk and/or in remote storage.
//! They may contain Delta Records, Page Images, or a mixture thereof. It doesn't matter.
//! - **HistoricStuff**: an efficient lookup data structure to find the list of Historic Layer objects
//! that hold the Delta Records / PageImages required to reconstruct a Page Image at a given LSN.
//!
//! # API
//!
//! The core idea is that of a specialized single-producer multi-consumer structure,
//! embodied by a Read-end and a Write-end.
//!
//! The Write-end is used to push new `DeltaRecord @ LSN`s into the system.
//! In Pageserver, this is used by the `WalReceiver`.
//!
//! The Read-end provides the `GetPage@LSN` API.
//! In the current iteration, we actually return something called `ReconstructWork`.
//! I.e., we leave the work of reading the values from the layers, and the WAL redo invocation to the caller.
//! Find rationale for this design in the *Scope* section.
//!
//! ## Immutability
//!
//! The traits defined by this crate assume immutable data structures that are multi-versioned.
//!
//! As an example for what "immutable" means, take the case where we add a new Historic Layer to HistoricStuff.
//! Traditionally, one would use shared mutable state, i.e. `Arc<RwLock<...>>`.
//! To insert the new Historic Layer, we would acquire the RwLock in write mode and modify a lookup data structure to accomodate the new layer.
//! The Read-ends would use RwLock in read mode to read from the data structure.
//!
//! Conversely, with *immutable data structures*, writers create new version (aka *snapshots*) of the lookup data structure.
//! New reads on the Read-ends will use the new snapshot, but old ongoing reads would use the old version(s).
//! An efficient implementation would likely share the Historic Layer objects, e.g., using `Arc`.
//! And maybe there's internally mutable state inside the layer objects, e.g., to track residence (i.e., *on-demand downloaded* vs *evicted*).
//! But the important point is that there's no synchronization / lock-holding at any higher level, except when grabbing a reference to the snapshot (Read-end), or when publishing a new snapshot (Write-end).
//!
//! ## Scope
//!
//! The following concerns are considered implementation details from the perspective of this crate:
//!
//! - **Layer File Persistence**: `HistoricStuff::make_historic` is responsible for this.
//! - **Reading Layer Files**: the `ReconstructWork` that the Read-end returns from `GetPage@LSN` requests contains the list of layers to consult.
//! The crate consumer is responsible for reading the layers & doing WAL redo.
//! Likely the implementation of `HistoricStuff` plays a role here, because it is responsible for persisting the layer files.
//! - **Layer Eviction & On-Demand Download**: this is just an aspect of the above.
//! The crate consumer can choose to implement eviction & on-demand download however they wish.
//! The only requirement is that the Historic Layers don't change their contents, i.e., they always returnt he same reconstruct values for the same lookup.
//! - For example, a `LayerCache` modoule or service could take care of layer uploads, eviction, and on-demand downloads.
//! Initially, the `layer cache` can be local-only.
//! But in the future, it can be multi-machine / clustered pagesevers / aka "sharding".
//!
//! # Example
//!
//! The [`new`] function is the entrypoint to this crate.
//!
//! See the test cases for how it is used.
use std::{marker::PhantomData, time::Duration};
use utils::seqwait::{self, Advance, SeqWait, Wait};
#[cfg(test)]
mod tests;
/// Collection of types / type bounds used by Read-end and Write-end.
///
/// See the [`crate`]-level docs's *Concepts* section to learn about
/// the meaning of each associated `type`.
///
/// # Usage
///
/// Define a zero-sized-type and impl this Trait for it.
/// Then use that zero-sized-type as the single generic argument to [`new`]
/// and almost all types declared in this crate.
///
/// It might feel a bit weird, but, the alternative is to have umpteen generic
/// types per `impl` with repetitive trait bounds.
///
/// Search the test cases for an example of how this can be used to improve testability.
pub trait Types {
type Key: Copy;
type Lsn: Ord + Copy;
type LsnCounter: seqwait::MonotonicCounter<Self::Lsn> + Copy;
type DeltaRecord;
type HistoricLayer;
type InMemoryLayer: InMemoryLayer<Types = Self> + Clone;
type HistoricStuff: HistoricStuff<Types = Self> + Clone;
type GetReconstructPathError: std::error::Error;
}
/// Error returned by [`InMemoryLayer::put`].
#[derive(thiserror::Error)]
pub struct InMemoryLayerPutError<DeltaRecord> {
delta: DeltaRecord,
kind: InMemoryLayerPutErrorKind,
}
/// Part of [`InMemoryLayerPutError`].
#[derive(Debug)]
pub enum InMemoryLayerPutErrorKind {
LayerFull,
AlreadyHaveRecordForKeyAndLsn,
}
impl<DeltaRecord> std::fmt::Debug for InMemoryLayerPutError<DeltaRecord> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerPutError")
// would require DeltaRecord to impl Debug
// .field("delta", &self.delta)
.field("kind", &self.kind)
.finish()
}
}
/// An in-memory layer. See [`crate`] docs for details on this concept.
pub trait InMemoryLayer: std::fmt::Debug + Default + Clone {
type Types: Types;
fn put(
&mut self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
delta: <Self::Types as Types>::DeltaRecord,
) -> Result<Self, InMemoryLayerPutError<<Self::Types as Types>::DeltaRecord>>;
fn get(
&self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
) -> Vec<<Self::Types as Types>::DeltaRecord>;
}
/// The manager of [`Types::HistoricLayer`]s.
pub trait HistoricStuff {
type Types: Types;
fn get_reconstruct_path(
&self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
) -> Result<
Vec<<Self::Types as Types>::HistoricLayer>,
<Self::Types as Types>::GetReconstructPathError,
>;
/// Produce a new version of `self` that includes the given inmem layer.
fn make_historic(&self, inmem: <Self::Types as Types>::InMemoryLayer) -> Self;
}
/// A snapshot of the data. See [`crate`]-level docs section on *immutability* for details.
struct Snapshot<T: Types> {
_types: PhantomData<T>,
inmem: Option<T::InMemoryLayer>,
historic: T::HistoricStuff,
}
impl<T: Types> Clone for Snapshot<T> {
fn clone(&self) -> Self {
Self {
_types: self._types.clone(),
inmem: self.inmem.clone(),
historic: self.historic.clone(),
}
}
}
/// The Read-end. See [`crate`]-level docs for details.
pub struct Reader<T: Types> {
wait: Wait<T::LsnCounter, T::Lsn, Snapshot<T>>,
}
/// The Write-end. See [`crate`]-level docs for details.
pub struct Writer<T: Types> {
advance: Advance<T::LsnCounter, T::Lsn, Snapshot<T>>,
}
/// Setup a pair of Read-end and Write-End. This is the entrypoint to this crate.
///
/// The idea is that the caller loads the arguments from persistent state that `HistoricStuff` wrote at an earlier point in time.
pub fn new<T: Types>(lsn: T::LsnCounter, historic: T::HistoricStuff) -> (Reader<T>, Writer<T>) {
let state = Snapshot {
_types: PhantomData::<T>::default(),
inmem: None,
historic: historic,
};
let (wait, advance) = SeqWait::new(lsn, state).split_spmc();
let reader = Reader { wait };
let read_writer = Writer { advance };
(reader, read_writer)
}
/// Error returned by the get-page operations.
#[derive(Debug, thiserror::Error)]
pub enum GetError<T: Types> {
#[error(transparent)]
SeqWait(seqwait::SeqWaitError),
#[error(transparent)]
GetReconstructPath(T::GetReconstructPathError),
}
/// Self-contained set of objects required to reconstruct a page image for the given `key` @ `lsn`.
///
/// This is returned by the `get` methods of [`Reader`] and [`Writer`].
///
/// To reconstruct the page image, stack up (top to bottom) `inmem_records` plus all records found for `key` and `lsn` along the `historic_path` until an initial page image is found.
/// Then feed that stack to WAL-redo to get the page image.
///
/// See [`crate`]-level docs on *scope* for why we don't return page images from these functions.
pub struct ReconstructWork<T: Types> {
pub key: T::Key,
pub lsn: T::Lsn,
pub inmem_records: Vec<T::DeltaRecord>,
pub historic_path: Vec<T::HistoricLayer>,
}
impl<T: Types> Reader<T> {
/// This is the `GetPage@LSN` operation.
///
/// See the [`crate`]-level docs for why we return [`ReconstructWork`] instead of a Page Image here.
pub async fn get(&self, key: T::Key, lsn: T::Lsn) -> Result<ReconstructWork<T>, GetError<T>> {
// XXX dedup with Writer::get_nowait
let state = self.wait.wait_for(lsn).await.map_err(GetError::SeqWait)?;
let inmem_records = state
.inmem
.as_ref()
.map(|iml| iml.get(key, lsn))
.unwrap_or_default();
let historic_path = state
.historic
.get_reconstruct_path(key, lsn)
.map_err(GetError::GetReconstructPath)?;
Ok(ReconstructWork {
key,
lsn,
inmem_records,
historic_path,
})
}
}
/// Error returned by the `put` operation.
#[derive(thiserror::Error)]
pub struct PutError<T: Types> {
/// The `delta` record which we failed to `put`.
pub delta: T::DeltaRecord,
/// Description of what went wrong.
pub kind: PutErrorKind,
}
/// Part of [`PutError`].
#[derive(Debug)]
pub enum PutErrorKind {
AlreadyHaveInMemoryRecordForKeyAndLsn,
}
impl<T: Types> std::fmt::Debug for PutError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PutError")
// would need to require Debug for DeltaRecord
// .field("delta", &self.delta)
.field("kind", &self.kind)
.finish()
}
}
impl<T: Types> Writer<T> {
/// Insert data into the system.
pub async fn put(
&mut self,
key: T::Key,
lsn: T::Lsn,
delta: T::DeltaRecord,
) -> Result<(), PutError<T>> {
let (_snapshot_lsn, snapshot) = self.advance.get_current_data();
// TODO ensure snapshot_lsn <= lsn?
let mut inmem = snapshot
.inmem
.unwrap_or_else(|| T::InMemoryLayer::default());
// XXX: use the Advance as witness and only allow witness to access inmem in write mode
match inmem.put(key, lsn, delta) {
Ok(new_inmem) => {
let new_snapshot = Snapshot {
_types: PhantomData,
inmem: Some(new_inmem),
historic: snapshot.historic,
};
self.advance.advance(lsn, Some(new_snapshot));
}
Err(InMemoryLayerPutError {
delta,
kind: InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
}) => {
return Err(PutError {
delta,
kind: PutErrorKind::AlreadyHaveInMemoryRecordForKeyAndLsn,
});
}
Err(InMemoryLayerPutError {
delta,
kind: InMemoryLayerPutErrorKind::LayerFull,
}) => {
let new_historic = snapshot.historic.make_historic(inmem);
let mut new_inmem = T::InMemoryLayer::default();
let new_inmem = new_inmem
.put(key, lsn, delta)
.expect("put into default inmem layer must not fail");
let new_state = Snapshot {
_types: PhantomData::<T>::default(),
inmem: Some(new_inmem),
historic: new_historic,
};
self.advance.advance(lsn, Some(new_state));
}
}
Ok(())
}
/// Force flushing of the current in-memory layer.
///
/// Usually, flushing happens only if the in-memory layer is full.
/// Use this API to make it happen in other circumstances (shutdown, periodic ticker, etc.).
pub async fn force_flush(&mut self) -> tokio::io::Result<()> {
let (snapshot_lsn, snapshot) = self.advance.get_current_data();
let Snapshot {
_types,
inmem,
historic,
} = snapshot;
// XXX: use the Advance as witness and only allow witness to access inmem in "write" mode
let Some(inmem) = inmem else {
// nothing to do
return Ok(());
};
let new_historic = historic.make_historic(inmem);
let new_snapshot = Snapshot {
_types: PhantomData::<T>::default(),
inmem: None,
historic: new_historic,
};
self.advance.advance(snapshot_lsn, Some(new_snapshot)); // TODO: should fail if we're past snapshot_lsn
Ok(())
}
/// `get` at the given LSN, without blocking.
///
/// Fails with a timeout error if the `lsn` isn't there yet.
/// That makes sense because the only way we'd stop waiting is by a `self.put()`.
/// But concurrent `put()` is forbidden.
pub async fn get_nowait(
&self,
key: T::Key,
lsn: T::Lsn,
) -> Result<ReconstructWork<T>, GetError<T>> {
// XXX dedup with Reader::get
let state = self
.advance
.wait_for_timeout(lsn, Duration::from_secs(0))
// The await is never going to block because we pass from_secs(0).
.await
.map_err(GetError::SeqWait)?;
let inmem_records = state
.inmem
.as_ref()
.map(|iml| iml.get(key, lsn))
.unwrap_or_default();
let historic_path = state
.historic
.get_reconstruct_path(key, lsn)
.map_err(GetError::GetReconstructPath)?;
Ok(ReconstructWork {
key,
lsn,
inmem_records,
historic_path,
})
}
}

View File

@@ -0,0 +1,170 @@
use std::collections::{btree_map::Entry, BTreeMap};
use std::sync::Arc;
use utils::seqwait;
/// The ZST for which we impl the `super::Types` type collection trait.
struct TestTypes;
impl super::Types for TestTypes {
type Key = usize;
type Lsn = usize;
type LsnCounter = UsizeCounter;
type DeltaRecord = &'static str;
type HistoricLayer = Arc<TestHistoricLayer>;
type InMemoryLayer = TestInMemoryLayer;
type HistoricStuff = TestHistoricStuff;
}
/// For testing, our in-memory layer is a simple hashmap.
#[derive(Clone, Default, Debug)]
struct TestInMemoryLayer {
by_key: BTreeMap<usize, BTreeMap<usize, &'static str>>,
}
/// For testing, our historic layers are just in-memory layer objects with `frozen==true`.
struct TestHistoricLayer(TestInMemoryLayer);
/// This is the data structure that impls the `HistoricStuff` trait.
#[derive(Default, Clone)]
struct TestHistoricStuff {
by_key: BTreeMap<usize, BTreeMap<usize, Arc<TestHistoricLayer>>>,
}
/// `seqwait::MonotonicCounter` impl
#[derive(Copy, Clone)]
pub struct UsizeCounter(usize);
// Our testing impl of HistoricStuff references the frozen InMemoryLayer objects
// from all the (key,lsn) entries that it covers.
// This mimics the (much more efficient) search tree in the real impl.
impl super::HistoricStuff for TestHistoricStuff {
type Types = TestTypes;
fn get_reconstruct_path(
&self,
key: usize,
lsn: usize,
) -> Result<Vec<Arc<TestHistoricLayer>>, super::GetReconstructPathError> {
let Some(bk) = self.by_key.get(&key) else {
return Ok(vec![]);
};
Ok(bk.range(..=lsn).rev().map(|(_, l)| Arc::clone(l)).collect())
}
fn make_historic(&self, inmem: TestInMemoryLayer) -> Self {
// For the purposes of testing, just turn the inmemory layer historic through the type system
let historic = Arc::new(TestHistoricLayer(inmem));
// Deep-copy
let mut copy = self.by_key.clone();
// Add the references to `inmem` to the deep-copied struct
for (k, v) in historic.0.by_key.iter() {
for (lsn, _deltas) in v.into_iter() {
let by_key = copy.entry(*k).or_default();
let overwritten = by_key.insert(*lsn, historic.clone());
assert!(matches!(overwritten, None), "layers must not overlap");
}
}
Self { by_key: copy }
}
}
impl super::InMemoryLayer for TestInMemoryLayer {
type Types = TestTypes;
fn put(
&mut self,
key: usize,
lsn: usize,
delta: &'static str,
) -> Result<Self, super::InMemoryLayerPutError<&'static str>> {
let mut clone = self.clone();
drop(self);
let by_key = clone.by_key.entry(key).or_default();
match by_key.entry(lsn) {
Entry::Occupied(_record) => {
return Err(super::InMemoryLayerPutError {
delta,
kind: super::InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
});
}
Entry::Vacant(vacant) => vacant.insert(delta),
};
Ok(clone)
}
fn get(&self, key: usize, lsn: usize) -> Vec<&'static str> {
let by_key = match self.by_key.get(&key) {
Some(by_key) => by_key,
None => return vec![],
};
by_key
.range(..=lsn)
.map(|(_, v)| v)
.rev()
.cloned()
.collect()
}
}
impl UsizeCounter {
pub fn new(inital: usize) -> Self {
UsizeCounter(inital)
}
}
impl seqwait::MonotonicCounter<usize> for UsizeCounter {
fn cnt_advance(&mut self, new_val: usize) {
assert!(self.0 < new_val);
self.0 = new_val;
}
fn cnt_value(&self) -> usize {
self.0
}
}
#[test]
fn basic() {
let lm = TestHistoricStuff::default();
let (r, mut rw) = super::new::<TestTypes>(UsizeCounter::new(0), lm);
let r = Arc::new(r);
let r2 = Arc::clone(&r);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let read_jh = rt.spawn(async move { r.get(0, 10).await });
let mut rw = rt.block_on(async move {
rw.put(0, 1, "foo").await.unwrap();
rw.put(1, 1, "bar").await.unwrap();
rw.put(0, 10, "baz").await.unwrap();
rw
});
let read_res = rt.block_on(read_jh).unwrap().unwrap();
assert!(
read_res.historic_path.is_empty(),
"we have pushed less than needed for flush"
);
assert_eq!(read_res.inmem_records, vec!["baz", "foo"]);
let rw = rt.block_on(async move {
rw.put(0, 11, "blup").await.unwrap();
rw
});
let read_res = rt.block_on(async move { r2.get(0, 11).await.unwrap() });
assert_eq!(read_res.historic_path.len(), 0);
assert_eq!(read_res.inmem_records, vec!["blup", "baz", "foo"]);
drop(rw);
}

View File

@@ -14,5 +14,4 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
workspace_hack.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -33,10 +33,11 @@ serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
url.workspace = true
uuid.workspace = true
uuid = { version = "1.2", features = ["v4", "serde"] }
metrics.workspace = true
workspace_hack.workspace = true
either.workspace = true
[dev-dependencies]
byteorder.workspace = true

View File

@@ -76,7 +76,6 @@ where
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
@@ -88,11 +87,7 @@ where
// Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call.
//
// Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally.
let res = (self.0)(request).await;
cancellation_guard.disarm();
match res {
match (self.0)(request).await {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
@@ -110,38 +105,6 @@ where
}
}
/// Drop guard to WARN in case the request was dropped before completion.
struct RequestCancelled {
warn: Option<tracing::Span>,
}
impl RequestCancelled {
/// Create the drop guard using the [`tracing::Span::current`] as the span.
fn warn_when_dropped_without_responding() -> Self {
RequestCancelled {
warn: Some(tracing::Span::current()),
}
}
/// Consume the drop guard without logging anything.
fn disarm(mut self) {
self.warn = None;
}
}
impl Drop for RequestCancelled {
fn drop(&mut self) {
if let Some(span) = self.warn.take() {
// the span has all of the info already, but the outer `.instrument(span)` has already
// been dropped, so we need to manually re-enter it for this message.
//
// this is what the instrument would do before polling so it is fine.
let _g = span.entered();
warn!("request was dropped before completing");
}
}
}
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
SERVE_METRICS_COUNT.inc();

View File

@@ -1,7 +1,6 @@
use std::str::FromStr;
use anyhow::Context;
use once_cell::sync::Lazy;
use strum_macros::{EnumString, EnumVariantNames};
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
@@ -24,64 +23,25 @@ impl LogFormat {
}
}
static TRACING_EVENT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
metrics::register_int_counter_vec!(
"libmetrics_tracing_event_count",
"Number of tracing events, by level",
&["level"]
)
.expect("failed to define metric")
});
struct TracingEventCountLayer(&'static metrics::IntCounterVec);
impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
where
S: tracing::Subscriber,
{
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let level = event.metadata().level();
let level = match *level {
tracing::Level::ERROR => "error",
tracing::Level::WARN => "warn",
tracing::Level::INFO => "info",
tracing::Level::DEBUG => "debug",
tracing::Level::TRACE => "trace",
};
self.0.with_label_values(&[level]).inc();
}
}
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
let default_filter_str = "info";
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
})
.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()))
.init();
let base_logger = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
match log_format {
LogFormat::Json => base_logger.json().init(),
LogFormat::Plain => base_logger.init(),
LogFormat::Test => base_logger.with_test_writer().init(),
}
Ok(())
}
@@ -197,33 +157,3 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
<Self as std::fmt::Display>::fmt(self, f)
}
}
#[cfg(test)]
mod tests {
use metrics::{core::Opts, IntCounterVec};
use super::TracingEventCountLayer;
#[test]
fn tracing_event_count_metric() {
let counter_vec =
IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static
let layer = TracingEventCountLayer(counter_vec);
use tracing_subscriber::prelude::*;
tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
tracing::trace!("foo");
tracing::debug!("foo");
tracing::info!("foo");
tracing::warn!("foo");
tracing::error!("foo");
});
assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
}
}

View File

@@ -1,12 +1,13 @@
#![warn(missing_docs)]
use either::Either;
use std::cmp::{Eq, Ordering, PartialOrd};
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::mem;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::time::timeout;
/// An error happened while waiting for a number
@@ -36,45 +37,48 @@ pub trait MonotonicCounter<V> {
}
/// Internal components of a `SeqWait`
struct SeqWaitInt<S, V>
struct SeqWaitInt<S, V, T>
where
S: MonotonicCounter<V>,
V: Ord,
T: Clone,
{
waiters: BinaryHeap<Waiter<V>>,
waiters: BinaryHeap<Waiter<V, T>>,
current: S,
shutdown: bool,
data: T,
}
struct Waiter<T>
struct Waiter<V, T>
where
T: Ord,
V: Ord,
T: Clone,
{
wake_num: T, // wake me when this number arrives ...
wake_channel: Sender<()>, // ... by sending a message to this channel
wake_num: V, // wake me when this number arrives ...
wake_channel: Sender<T>, // ... by sending a message to this channel
}
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that.
impl<T: Ord> PartialOrd for Waiter<T> {
impl<V: Ord, T: Clone> PartialOrd for Waiter<V, T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.wake_num.partial_cmp(&self.wake_num)
}
}
impl<T: Ord> Ord for Waiter<T> {
impl<V: Ord, T: Clone> Ord for Waiter<V, T> {
fn cmp(&self, other: &Self) -> Ordering {
other.wake_num.cmp(&self.wake_num)
}
}
impl<T: Ord> PartialEq for Waiter<T> {
impl<V: Ord, T: Clone> PartialEq for Waiter<V, T> {
fn eq(&self, other: &Self) -> bool {
other.wake_num == self.wake_num
}
}
impl<T: Ord> Eq for Waiter<T> {}
impl<V: Ord, T: Clone> Eq for Waiter<V, T> {}
/// A tool for waiting on a sequence number
///
@@ -92,25 +96,28 @@ impl<T: Ord> Eq for Waiter<T> {}
///
/// <S> means Storage, <V> is type of counter that this storage exposes.
///
pub struct SeqWait<S, V>
pub struct SeqWait<S, V, T>
where
S: MonotonicCounter<V>,
V: Ord,
T: Clone,
{
internal: Mutex<SeqWaitInt<S, V>>,
internal: Mutex<SeqWaitInt<S, V, T>>,
}
impl<S, V> SeqWait<S, V>
impl<S, V, T> SeqWait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// Create a new `SeqWait`, initialized to a particular number
pub fn new(starting_num: S) -> Self {
pub fn new(starting_num: S, data: T) -> Self {
let internal = SeqWaitInt {
waiters: BinaryHeap::new(),
current: starting_num,
shutdown: false,
data,
};
SeqWait {
internal: Mutex::new(internal),
@@ -144,10 +151,13 @@ where
///
/// This call won't complete until someone has called `advance`
/// with a number greater than or equal to the one we're waiting for.
pub async fn wait_for(&self, num: V) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(mut rx)) => rx.changed().await.map_err(|_| SeqWaitError::Shutdown),
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
match self.queue_for_wait(num, false) {
Ok(Either::Left(data)) => Ok(data),
Ok(Either::Right(rx)) => match rx.await {
Err(_) => Err(SeqWaitError::Shutdown),
Ok(data) => Ok(data),
},
Err(e) => Err(e),
}
}
@@ -159,15 +169,18 @@ where
///
/// If that hasn't happened after the specified timeout duration,
/// [`SeqWaitError::Timeout`] will be returned.
///
/// Pass `timeout_duration.is_zero() == true` to guarantee that the
/// future that is this function will never await.
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(mut rx)) => match timeout(timeout_duration, rx.changed()).await {
Ok(Ok(())) => Ok(()),
) -> Result<T, SeqWaitError> {
match self.queue_for_wait(num, timeout_duration.is_zero()) {
Ok(Either::Left(data)) => Ok(data),
Ok(Either::Right(rx)) => match timeout(timeout_duration, rx).await {
Ok(Ok(data)) => Ok(data),
Ok(Err(_)) => Err(SeqWaitError::Shutdown),
Err(_) => Err(SeqWaitError::Timeout),
},
@@ -177,41 +190,50 @@ where
/// Register and return a channel that will be notified when a number arrives,
/// or None, if it has already arrived.
fn queue_for_wait(&self, num: V) -> Result<Option<Receiver<()>>, SeqWaitError> {
fn queue_for_wait(&self, num: V, nowait: bool) -> Result<Either<T, Receiver<T>>, SeqWaitError> {
let mut internal = self.internal.lock().unwrap();
if internal.current.cnt_value() >= num {
return Ok(None);
return Ok(Either::Left(internal.data.clone()));
}
if internal.shutdown {
return Err(SeqWaitError::Shutdown);
}
if nowait {
return Err(SeqWaitError::Timeout);
}
// Create a new channel.
let (tx, rx) = channel(());
let (tx, rx) = channel();
internal.waiters.push(Waiter {
wake_num: num,
wake_channel: tx,
});
// Drop the lock as we exit this scope.
Ok(Some(rx))
Ok(Either::Right(rx))
}
/// Announce a new number has arrived
///
/// All waiters at this value or below will be woken.
///
/// If `new_data` is Some(), it will update the internal data,
/// even if `num` is smaller than the internal counter.
/// It will not cause a wake-up though, in this case.
///
/// Returns the old number.
pub fn advance(&self, num: V) -> V {
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
let old_value;
let wake_these = {
let (wake_these, with_data) = {
let mut internal = self.internal.lock().unwrap();
if let Some(new_data) = new_data {
internal.data = new_data;
}
old_value = internal.current.cnt_value();
if old_value >= num {
return old_value;
}
internal.current.cnt_advance(num);
// Pop all waiters <= num from the heap. Collect them in a vector, and
// wake them up after releasing the lock.
let mut wake_these = Vec::new();
@@ -221,13 +243,13 @@ where
}
wake_these.push(internal.waiters.pop().unwrap().wake_channel);
}
wake_these
(wake_these, internal.data.clone())
};
for tx in wake_these {
// This can fail if there are no receivers.
// We don't care; discard the error.
let _ = tx.send(());
let _ = tx.send(with_data.clone());
}
old_value
}
@@ -236,6 +258,106 @@ where
pub fn load(&self) -> S {
self.internal.lock().unwrap().current
}
/// Split the seqwait into a part than can only do wait,
/// and another part that can do advance + wait.
///
/// The wait-only part can be cloned, the advance part cannot be cloned.
/// This provides a single-producer multi-consumer scheme.
pub fn split_spmc(self) -> (Wait<S, V, T>, Advance<S, V, T>) {
let inner = Arc::new(self);
let w = Wait {
inner: inner.clone(),
};
let a = Advance { inner };
(w, a)
}
}
/// See [`SeqWait::split_spmc`].
pub struct Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
inner: Arc<SeqWait<S, V, T>>,
}
/// See [`SeqWait::split_spmc`].
pub struct Advance<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
inner: Arc<SeqWait<S, V, T>>,
}
impl<S, V, T> Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// See [`SeqWait::wait_for`].
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
self.inner.wait_for(num).await
}
/// See [`SeqWait::wait_for_timeout`].
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
self.inner.wait_for_timeout(num, timeout_duration).await
}
}
impl<S, V, T> Advance<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// See [`SeqWait::advance`].
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
self.inner.advance(num, new_data)
}
/// See [`SeqWait::wait_for`].
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
self.inner.wait_for(num).await
}
/// See [`SeqWait::wait_for_timeout`].
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
self.inner.wait_for_timeout(num, timeout_duration).await
}
/// Get a `Clone::clone` of the current data inside the seqwait.
pub fn get_current_data(&self) -> (V, T) {
let inner = self.inner.internal.lock().unwrap();
(inner.current.cnt_value(), inner.data.clone())
}
}
impl<S, V, T> Clone for Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
@@ -256,12 +378,12 @@ mod tests {
#[tokio::test]
async fn seqwait() {
let seq = Arc::new(SeqWait::new(0));
let seq = Arc::new(SeqWait::new(0, ()));
let seq2 = Arc::clone(&seq);
let seq3 = Arc::clone(&seq);
let jh1 = tokio::task::spawn(async move {
seq2.wait_for(42).await.expect("wait_for 42");
let old = seq2.advance(100);
let old = seq2.advance(100, None);
assert_eq!(old, 99);
seq2.wait_for_timeout(999, Duration::from_millis(100))
.await
@@ -272,12 +394,12 @@ mod tests {
seq3.wait_for(0).await.expect("wait_for 0");
});
tokio::time::sleep(Duration::from_millis(200)).await;
let old = seq.advance(99);
let old = seq.advance(99, None);
assert_eq!(old, 0);
seq.wait_for(100).await.expect("wait_for 100");
// Calling advance with a smaller value is a no-op
assert_eq!(seq.advance(98), 100);
assert_eq!(seq.advance(98, None), 100);
assert_eq!(seq.load(), 100);
jh1.await.unwrap();
@@ -288,7 +410,7 @@ mod tests {
#[tokio::test]
async fn seqwait_timeout() {
let seq = Arc::new(SeqWait::new(0));
let seq = Arc::new(SeqWait::new(0, ()));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
let timeout = Duration::from_millis(1);
@@ -298,10 +420,104 @@ mod tests {
tokio::time::sleep(Duration::from_millis(200)).await;
// This will attempt to wake, but nothing will happen
// because the waiter already dropped its Receiver.
let old = seq.advance(99);
let old = seq.advance(99, None);
assert_eq!(old, 0);
jh.await.unwrap();
seq.shutdown();
}
#[tokio::test]
async fn data_basic() {
let seq = Arc::new(SeqWait::new(0, "a"));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
let data = seq.wait_for(2).await.unwrap();
assert_eq!(data, "b");
});
seq2.advance(1, Some("x"));
seq2.advance(2, Some("b"));
jh.await.unwrap();
}
#[test]
fn data_always_most_recent() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let seq = Arc::new(SeqWait::new(0, "a"));
let seq2 = Arc::clone(&seq);
let jh = rt.spawn(async move {
let data = seq.wait_for(2).await.unwrap();
assert_eq!(data, "d");
});
// jh is not running until we poll it, thanks to current thread runtime
rt.block_on(async move {
seq2.advance(2, Some("b"));
seq2.advance(3, Some("c"));
seq2.advance(4, Some("d"));
});
rt.block_on(jh).unwrap();
}
#[tokio::test]
async fn split_spmc_api_surface() {
let seq = SeqWait::new(0, 1);
let (w, a) = seq.split_spmc();
let _ = w.wait_for(1);
let _ = w.wait_for_timeout(0, Duration::from_secs(10));
let _ = w.clone();
let _ = a.advance(1, None);
let _ = a.wait_for(1);
let _ = a.wait_for_timeout(0, Duration::from_secs(10));
// TODO would be nice to have must-not-compile tests for Advance not being clonable.
}
#[tokio::test]
async fn new_data_same_lsn() {
let seq = Arc::new(SeqWait::new(0, "a"));
seq.advance(1, Some("b"));
let data = seq.wait_for(1).await.unwrap();
assert_eq!(data, "b", "the regular case where lsn and data advance");
seq.advance(1, Some("c"));
let data = seq.wait_for(1).await.unwrap();
assert_eq!(
data, "c",
"no lsn advance still gives new data for old lsn wait_for's"
);
let (start_wait_for_sender, start_wait_for_receiver) = tokio::sync::oneshot::channel();
// ensure we don't wake waiters for data-only change
let jh = tokio::spawn({
let seq = seq.clone();
async move {
start_wait_for_receiver.await.unwrap();
match tokio::time::timeout(Duration::from_secs(2), seq.wait_for(2)).await {
Ok(_) => {
assert!(
false,
"advance should not wake waiters if data changes but LSN doesn't"
);
}
Err(_) => {
// Good, we weren't woken up.
}
}
}
});
seq.advance(1, Some("d"));
start_wait_for_sender.send(()).unwrap();
jh.await.unwrap();
}
}

View File

@@ -13,7 +13,7 @@ use std::time::Instant;
use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
let mut layer_map = LayerMap::<LayerDescriptor>::default();
@@ -114,7 +114,7 @@ fn bench_from_captest_env(c: &mut Criterion) {
c.bench_function("captest_uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
black_box(layer_map.search(q.0, q.1));
layer_map.search(q.0, q.1);
}
});
});
@@ -122,11 +122,11 @@ fn bench_from_captest_env(c: &mut Criterion) {
// test with a key that corresponds to the RelDir entry. See pgdatadir_mapping.rs.
c.bench_function("captest_rel_dir_query", |b| {
b.iter(|| {
let result = black_box(layer_map.search(
let result = layer_map.search(
Key::from_hex("000000067F00008000000000000000000001").unwrap(),
// This LSN is higher than any of the LSNs in the tree
Lsn::from_str("D0/80208AE1").unwrap(),
));
);
result.unwrap();
});
});
@@ -183,7 +183,7 @@ fn bench_from_real_project(c: &mut Criterion) {
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
black_box(layer_map.search(q.0, q.1));
layer_map.search(q.0, q.1);
}
});
});
@@ -232,7 +232,7 @@ fn bench_sequential(c: &mut Criterion) {
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
black_box(layer_map.search(q.0, q.1));
layer_map.search(q.0, q.1);
}
});
});

View File

@@ -6,7 +6,6 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde::de::IntoDeserializer;
use std::env;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
@@ -63,6 +62,7 @@ pub mod defaults {
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
///
/// Default built-in configuration file.
@@ -91,6 +91,7 @@ pub mod defaults {
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
@@ -107,7 +108,6 @@ pub mod defaults {
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
# [remote_storage]
@@ -182,6 +182,9 @@ pub struct PageServerConf {
pub metric_collection_endpoint: Option<Url>,
pub synthetic_size_calculation_interval: Duration,
// See the corresponding metric's help string.
pub evictions_low_residence_duration_metric_threshold: Duration,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
pub test_remote_failures: u64,
@@ -254,6 +257,8 @@ struct PageServerConfigBuilder {
metric_collection_endpoint: BuilderValue<Option<Url>>,
synthetic_size_calculation_interval: BuilderValue<Duration>,
evictions_low_residence_duration_metric_threshold: BuilderValue<Duration>,
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
test_remote_failures: BuilderValue<u64>,
@@ -311,6 +316,11 @@ impl Default for PageServerConfigBuilder {
.expect("cannot parse default synthetic size calculation interval")),
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
evictions_low_residence_duration_metric_threshold: Set(humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")),
disk_usage_based_eviction: Set(None),
test_remote_failures: Set(0),
@@ -428,6 +438,10 @@ impl PageServerConfigBuilder {
self.test_remote_failures = BuilderValue::Set(fail_first);
}
pub fn evictions_low_residence_duration_metric_threshold(&mut self, value: Duration) {
self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value);
}
pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
self.disk_usage_based_eviction = BuilderValue::Set(value);
}
@@ -511,6 +525,11 @@ impl PageServerConfigBuilder {
synthetic_size_calculation_interval: self
.synthetic_size_calculation_interval
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.ok_or(anyhow!(
"missing evictions_low_residence_duration_metric_threshold"
))?,
disk_usage_based_eviction: self
.disk_usage_based_eviction
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
@@ -702,12 +721,12 @@ impl PageServerConf {
"synthetic_size_calculation_interval" =>
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
"evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?),
"disk_usage_based_eviction" => {
tracing::info!("disk_usage_based_eviction: {:#?}", &item);
builder.disk_usage_based_eviction(
deserialize_from_item("disk_usage_based_eviction", item)
.context("parse disk_usage_based_eviction")?
)
toml_edit::de::from_item(item.clone())
.context("parse disk_usage_based_eviction")?)
},
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
@@ -808,25 +827,18 @@ impl PageServerConf {
if let Some(eviction_policy) = item.get("eviction_policy") {
t_conf.eviction_policy = Some(
deserialize_from_item("eviction_policy", eviction_policy)
toml_edit::de::from_item(eviction_policy.clone())
.context("parse eviction_policy")?,
);
}
if let Some(item) = item.get("min_resident_size_override") {
t_conf.min_resident_size_override = Some(
deserialize_from_item("min_resident_size_override", item)
toml_edit::de::from_item(item.clone())
.context("parse min_resident_size_override")?,
);
}
if let Some(item) = item.get("evictions_low_residence_duration_metric_threshold") {
t_conf.evictions_low_residence_duration_metric_threshold = Some(parse_toml_duration(
"evictions_low_residence_duration_metric_threshold",
item,
)?);
}
Ok(t_conf)
}
@@ -865,6 +877,10 @@ impl PageServerConf {
cached_metric_collection_interval: Duration::from_secs(60 * 60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: Duration::from_secs(60),
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.unwrap(),
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -922,18 +938,6 @@ where
})
}
fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned,
{
// ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
let deserializer = match item.clone().into_value() {
Ok(value) => value.into_deserializer(),
Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
};
T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
}
/// Configurable semaphore permits setting.
///
/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
@@ -1000,10 +1004,9 @@ mod tests {
use remote_storage::{RemoteStorageKind, S3Config};
use tempfile::{tempdir, TempDir};
use utils::serde_percent::Percent;
use super::*;
use crate::{tenant::config::EvictionPolicy, DEFAULT_PG_VERSION};
use crate::DEFAULT_PG_VERSION;
const ALL_BASE_VALUES_TOML: &str = r#"
# Initial configuration file created by 'pageserver --init'
@@ -1026,6 +1029,8 @@ cached_metric_collection_interval = '22200 s'
metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s'
evictions_low_residence_duration_metric_threshold = '444 s'
log_format = 'json'
"#;
@@ -1082,6 +1087,9 @@ log_format = 'json'
synthetic_size_calculation_interval: humantime::parse_duration(
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
)?,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD
)?,
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -1136,6 +1144,7 @@ log_format = 'json'
cached_metric_collection_interval: Duration::from_secs(22200),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
synthetic_size_calculation_interval: Duration::from_secs(333),
evictions_low_residence_duration_metric_threshold: Duration::from_secs(444),
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -1301,71 +1310,6 @@ trace_read_requests = {trace_read_requests}"#,
Ok(())
}
#[test]
fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let pageserver_conf_toml = format!(
r#"pg_distrib_dir = "{}"
metric_collection_endpoint = "http://sample.url"
metric_collection_interval = "10min"
id = 222
[disk_usage_based_eviction]
max_usage_pct = 80
min_avail_bytes = 0
period = "10s"
[tenant_config]
evictions_low_residence_duration_metric_threshold = "20m"
[tenant_config.eviction_policy]
kind = "LayerAccessThreshold"
period = "20m"
threshold = "20m"
"#,
pg_distrib_dir.display(),
);
let toml: Document = pageserver_conf_toml.parse()?;
let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
assert_eq!(
conf.metric_collection_endpoint,
Some("http://sample.url".parse().unwrap())
);
assert_eq!(
conf.metric_collection_interval,
Duration::from_secs(10 * 60)
);
assert_eq!(
conf.default_tenant_conf
.evictions_low_residence_duration_metric_threshold,
Duration::from_secs(20 * 60)
);
assert_eq!(conf.id, NodeId(222));
assert_eq!(
conf.disk_usage_based_eviction,
Some(DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 0,
period: Duration::from_secs(10),
#[cfg(feature = "testing")]
mock_statvfs: None,
})
);
match &conf.default_tenant_conf.eviction_policy {
EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
}
}
Ok(())
}
fn prepare_fs(tempdir: &TempDir) -> anyhow::Result<(PathBuf, PathBuf)> {
let tempdir_path = tempdir.path();

View File

@@ -520,43 +520,6 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/synthetic_size:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
get:
description: |
Calculate tenant's synthetic size
responses:
"200":
description: Tenant's synthetic size
content:
application/json:
schema:
$ref: "#/components/schemas/SyntheticSizeResponse"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/size:
parameters:
- name: tenant_id
@@ -985,84 +948,6 @@ components:
latest_gc_cutoff_lsn:
type: string
format: hex
SyntheticSizeResponse:
type: object
required:
- id
- size
- segment_sizes
- inputs
properties:
id:
type: string
format: hex
size:
type: integer
segment_sizes:
type: array
items:
$ref: "#/components/schemas/SegmentSize"
inputs:
type: object
properties:
segments:
type: array
items:
$ref: "#/components/schemas/SegmentData"
timeline_inputs:
type: array
items:
$ref: "#/components/schemas/TimelineInput"
SegmentSize:
type: object
required:
- method
- accum_size
properties:
method:
type: string
accum_size:
type: integer
SegmentData:
type: object
required:
- segment
properties:
segment:
type: object
required:
- lsn
properties:
parent:
type: integer
lsn:
type: integer
size:
type: integer
needed:
type: boolean
timeline_id:
type: string
format: hex
kind:
type: string
TimelineInput:
type: object
required:
- timeline_id
properties:
ancestor_id:
type: string
ancestor_lsn:
type: string
timeline_id:
type: string
format: hex
Error:
type: object
required:

View File

@@ -781,19 +781,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let target_tenant_id = request_data
.new_tenant_id
.map(TenantId::from)
@@ -927,19 +914,6 @@ async fn update_tenant_config_handler(
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
@@ -1201,36 +1175,6 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
)
}
async fn post_tracing_event_handler(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
enum Level {
Error,
Warn,
Info,
Debug,
Trace,
}
#[derive(Debug, serde::Deserialize)]
struct Request {
level: Level,
message: String,
}
let body: Request = json_request(&mut r)
.await
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
match body.level {
Level::Error => tracing::error!(?body.message),
Level::Warn => tracing::warn!(?body.message),
Level::Info => tracing::info!(?body.message),
Level::Debug => tracing::debug!(?body.message),
Level::Trace => tracing::trace!(?body.message),
}
json_response(StatusCode::OK, ())
}
pub fn make_router(
conf: &'static PageServerConf,
launch_ts: &'static LaunchTimestamp,
@@ -1371,9 +1315,5 @@ pub fn make_router(
testing_api!("set tenant state to broken", handle_tenant_break),
)
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
.post(
"/v1/tracing/event",
testing_api!("emit a tracing event", post_tracing_event_handler),
)
.any(handler_404))
}

View File

@@ -257,22 +257,6 @@ impl EvictionsWithLowResidenceDuration {
}
}
pub fn change_threshold(
&mut self,
tenant_id: &str,
timeline_id: &str,
new_threshold: Duration,
) {
if new_threshold == self.threshold {
return;
}
let mut with_new =
EvictionsWithLowResidenceDurationBuilder::new(self.data_source, new_threshold)
.build(tenant_id, timeline_id);
std::mem::swap(self, &mut with_new);
with_new.remove(tenant_id, timeline_id);
}
// This could be a `Drop` impl, but, we need the `tenant_id` and `timeline_id`.
fn remove(&mut self, tenant_id: &str, timeline_id: &str) {
let Some(_counter) = self.counter.take() else {
@@ -385,26 +369,6 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_timeline_client_bytes_started",
"Incremented by the number of bytes associated with a remote timeline client operation. \
The increment happens when the operation is scheduled.",
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
)
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_timeline_client_bytes_finished",
"Incremented by the number of bytes associated with a remote timeline client operation. \
The increment happens when the operation finishes (regardless of success/failure/shutdown).",
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
)
.expect("failed to define a metric")
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,
@@ -625,7 +589,7 @@ pub struct TimelineMetrics {
pub num_persistent_files_created: IntCounter,
pub persistent_bytes_written: IntCounter,
pub evictions: IntCounter,
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
pub evictions_with_low_residence_duration: EvictionsWithLowResidenceDuration,
}
impl TimelineMetrics {
@@ -692,9 +656,7 @@ impl TimelineMetrics {
num_persistent_files_created,
persistent_bytes_written,
evictions,
evictions_with_low_residence_duration: std::sync::RwLock::new(
evictions_with_low_residence_duration,
),
evictions_with_low_residence_duration,
}
}
}
@@ -713,8 +675,6 @@ impl Drop for TimelineMetrics {
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
self.evictions_with_low_residence_duration
.write()
.unwrap()
.remove(tenant_id, timeline_id);
for op in STORAGE_TIME_OPERATIONS {
let _ =
@@ -759,8 +719,6 @@ pub struct RemoteTimelineClientMetrics {
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
}
impl RemoteTimelineClientMetrics {
@@ -771,8 +729,6 @@ impl RemoteTimelineClientMetrics {
remote_operation_time: Mutex::new(HashMap::default()),
calls_unfinished_gauge: Mutex::new(HashMap::default()),
calls_started_hist: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge: Mutex::new(None),
}
}
@@ -811,7 +767,6 @@ impl RemoteTimelineClientMetrics {
});
metric.clone()
}
fn calls_unfinished_gauge(
&self,
file_kind: &RemoteOpFileKind,
@@ -853,125 +808,32 @@ impl RemoteTimelineClientMetrics {
});
metric.clone()
}
fn bytes_started_counter(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntCounter {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.bytes_started_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
])
.unwrap()
});
metric.clone()
}
fn bytes_finished_counter(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntCounter {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.bytes_finished_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
key.0,
key.1,
])
.unwrap()
});
metric.clone()
}
}
#[cfg(test)]
impl RemoteTimelineClientMetrics {
pub fn get_bytes_started_counter_value(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Option<u64> {
let guard = self.bytes_started_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
guard.get(&key).map(|counter| counter.get())
}
pub fn get_bytes_finished_counter_value(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Option<u64> {
let guard = self.bytes_finished_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
guard.get(&key).map(|counter| counter.get())
}
}
/// See [`RemoteTimelineClientMetrics::call_begin`].
#[must_use]
pub(crate) struct RemoteTimelineClientCallMetricGuard {
/// Decremented on drop.
calls_unfinished_metric: Option<IntGauge>,
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
bytes_finished: Option<(IntCounter, u64)>,
}
pub(crate) struct RemoteTimelineClientCallMetricGuard(Option<IntGauge>);
impl RemoteTimelineClientCallMetricGuard {
/// Consume this guard object without performing the metric updates it would do on `drop()`.
/// The caller vouches to do the metric updates manually.
/// Consume this guard object without decrementing the metric.
/// The caller vouches to do this manually, so that the prior increment of the gauge will cancel out.
pub fn will_decrement_manually(mut self) {
let RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric,
bytes_finished,
} = &mut self;
calls_unfinished_metric.take();
bytes_finished.take();
self.0 = None; // prevent drop() from decrementing
}
}
impl Drop for RemoteTimelineClientCallMetricGuard {
fn drop(&mut self) {
let RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric,
bytes_finished,
} = self;
if let Some(guard) = calls_unfinished_metric.take() {
if let RemoteTimelineClientCallMetricGuard(Some(guard)) = self {
guard.dec();
}
if let Some((bytes_finished_metric, value)) = bytes_finished {
bytes_finished_metric.inc_by(*value);
}
}
}
/// The enum variants communicate to the [`RemoteTimelineClientMetrics`] whether to
/// track the byte size of this call in applicable metric(s).
pub(crate) enum RemoteTimelineClientMetricsCallTrackSize {
/// Do not account for this call's byte size in any metrics.
/// The `reason` field is there to make the call sites self-documenting
/// about why they don't need the metric.
DontTrackSize { reason: &'static str },
/// Track the byte size of the call in applicable metric(s).
Bytes(u64),
}
impl RemoteTimelineClientMetrics {
/// Update the metrics that change when a call to the remote timeline client instance starts.
/// Increment the metrics that track ongoing calls to the remote timeline client instance.
///
/// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions.
/// Drop the returned guard object once the operation is finished to decrement the values.
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that
/// is more suitable.
/// Never do both.
@@ -979,51 +841,24 @@ impl RemoteTimelineClientMetrics {
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
size: RemoteTimelineClientMetricsCallTrackSize,
) -> RemoteTimelineClientCallMetricGuard {
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
self.calls_started_hist(file_kind, op_kind)
.observe(calls_unfinished_metric.get() as f64);
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
let bytes_finished = match size {
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
// nothing to do
None
}
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
self.bytes_started_counter(file_kind, op_kind).inc_by(size);
let finished_counter = self.bytes_finished_counter(file_kind, op_kind);
Some((finished_counter, size))
}
};
RemoteTimelineClientCallMetricGuard {
calls_unfinished_metric: Some(calls_unfinished_metric),
bytes_finished,
}
.observe(unfinished_metric.get() as f64);
unfinished_metric.inc();
RemoteTimelineClientCallMetricGuard(Some(unfinished_metric))
}
/// Manually udpate the metrics that track completions, instead of using the guard object.
/// Manually decrement the metric instead of using the guard object.
/// Using the guard object is generally preferable.
/// See [`call_begin`] for more context.
pub(crate) fn call_end(
&self,
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
size: RemoteTimelineClientMetricsCallTrackSize,
) {
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
pub(crate) fn call_end(&self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind) {
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
debug_assert!(
calls_unfinished_metric.get() > 0,
unfinished_metric.get() > 0,
"begin and end should cancel out"
);
calls_unfinished_metric.dec();
match size {
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
self.bytes_finished_counter(file_kind, op_kind).inc_by(size);
}
}
unfinished_metric.dec();
}
}
@@ -1036,8 +871,6 @@ impl Drop for RemoteTimelineClientMetrics {
remote_operation_time,
calls_unfinished_gauge,
calls_started_hist,
bytes_started_counter,
bytes_finished_counter,
} = self;
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
@@ -1058,22 +891,6 @@ impl Drop for RemoteTimelineClientMetrics {
b,
]);
}
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
tenant_id,
timeline_id,
a,
b,
]);
}
for ((a, b), _) in bytes_finished_counter.get_mut().unwrap().drain() {
let _ = REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER.remove_label_values(&[
tenant_id,
timeline_id,
a,
b,
]);
}
{
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);

View File

@@ -65,7 +65,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down".to_string();
let msg = format!("pageserver is shutting down");
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
@@ -700,8 +700,6 @@ impl PageServerHandler {
full_backup: bool,
ctx: RequestContext,
) -> anyhow::Result<()> {
let started = std::time::Instant::now();
// check that the timeline exists
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
@@ -714,8 +712,6 @@ impl PageServerHandler {
.context("invalid basebackup lsn")?;
}
let lsn_awaited_after = started.elapsed();
// switch client to COPYOUT
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
pgb.flush().await?;
@@ -736,17 +732,7 @@ impl PageServerHandler {
pgb.write_message_noflush(&BeMessage::CopyDone)?;
pgb.flush().await?;
let basebackup_after = started
.elapsed()
.checked_sub(lsn_awaited_after)
.unwrap_or(Duration::ZERO);
info!(
lsn_await_millis = lsn_awaited_after.as_millis(),
basebackup_millis = basebackup_after.as_millis(),
"basebackup complete"
);
info!("basebackup complete");
Ok(())
}

View File

@@ -1735,13 +1735,6 @@ impl Tenant {
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
*self.tenant_conf.write().unwrap() = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated();
}
}
fn create_timeline_data(
@@ -1894,7 +1887,7 @@ impl Tenant {
.to_string();
// Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
conf_content += &toml_edit::easy::to_string(&tenant_conf)?;
let mut target_config_file = VirtualFile::open_with_options(
target_config_path,
@@ -2822,9 +2815,6 @@ pub mod harness {
trace_read_requests: Some(tenant_conf.trace_read_requests),
eviction_policy: Some(tenant_conf.eviction_policy),
min_resident_size_override: tenant_conf.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: Some(
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
}
}
}

View File

@@ -39,7 +39,6 @@ pub mod defaults {
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}
/// Per-tenant configuration options
@@ -94,9 +93,6 @@ pub struct TenantConf {
pub trace_read_requests: bool,
pub eviction_policy: EvictionPolicy,
pub min_resident_size_override: Option<u64>,
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -168,11 +164,6 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub min_resident_size_override: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -237,9 +228,6 @@ impl TenantConfOpt {
min_resident_size_override: self
.min_resident_size_override
.or(global_conf.min_resident_size_override),
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
}
}
}
@@ -272,10 +260,6 @@ impl Default for TenantConf {
trace_read_requests: false,
eviction_policy: EvictionPolicy::NoEviction,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
}
}
}
@@ -291,9 +275,9 @@ mod tests {
..TenantConfOpt::default()
};
let toml_form = toml_edit::ser::to_string(&small_conf).unwrap();
let toml_form = toml_edit::easy::to_string(&small_conf).unwrap();
assert_eq!(toml_form, "gc_horizon = 42\n");
assert_eq!(small_conf, toml_edit::de::from_str(&toml_form).unwrap());
assert_eq!(small_conf, toml_edit::easy::from_str(&toml_form).unwrap());
let json_form = serde_json::to_string(&small_conf).unwrap();
assert_eq!(json_form, "{\"gc_horizon\":42}");

View File

@@ -219,8 +219,7 @@ use utils::lsn::Lsn;
use crate::metrics::{
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::{
@@ -368,13 +367,9 @@ impl RemoteTimelineClient {
/// Download index file
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Index,
&RemoteOpKind::Download,
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
reason: "no need for a downloads gauge",
},
);
let _unfinished_gauge_guard = self
.metrics
.call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download);
download::download_index_part(
self.conf,
@@ -403,13 +398,9 @@ impl RemoteTimelineClient {
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<u64> {
let downloaded_size = {
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Layer,
&RemoteOpKind::Download,
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
reason: "no need for a downloads gauge",
},
);
let _unfinished_gauge_guard = self
.metrics
.call_begin(&RemoteOpFileKind::Layer, &RemoteOpKind::Download);
download::download_layer_file(
self.conf,
&self.storage_impl,
@@ -895,32 +886,11 @@ impl RemoteTimelineClient {
fn calls_unfinished_metric_impl(
&self,
op: &UploadOp,
) -> Option<(
RemoteOpFileKind,
RemoteOpKind,
RemoteTimelineClientMetricsCallTrackSize,
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
) -> Option<(RemoteOpFileKind, RemoteOpKind)> {
let res = match op {
UploadOp::UploadLayer(_, m) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
),
UploadOp::UploadMetadata(_, _) => (
RemoteOpFileKind::Index,
RemoteOpKind::Upload,
DontTrackSize {
reason: "metadata uploads are tiny",
},
),
UploadOp::Delete(file_kind, _) => (
*file_kind,
RemoteOpKind::Delete,
DontTrackSize {
reason: "should we track deletes? positive or negative sign?",
},
),
UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload),
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
UploadOp::Barrier(_) => {
// we do not account these
return None;
@@ -930,20 +900,20 @@ impl RemoteTimelineClient {
}
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
Some(x) => x,
None => return,
};
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
let guard = self.metrics.call_begin(&file_kind, &op_kind);
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
}
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
Some(x) => x,
None => return,
};
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
self.metrics.call_end(&file_kind, &op_kind);
}
fn stop(&self) {
@@ -1011,19 +981,11 @@ impl RemoteTimelineClient {
mod tests {
use super::*;
use crate::{
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
Tenant,
},
tenant::harness::{TenantHarness, TIMELINE_ID},
DEFAULT_PG_VERSION,
};
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
use std::{
collections::HashSet,
path::{Path, PathBuf},
};
use tokio::runtime::EnterGuard;
use std::{collections::HashSet, path::Path};
use utils::lsn::Lsn;
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
@@ -1072,80 +1034,39 @@ mod tests {
assert_eq!(found, expected);
}
struct TestSetup {
runtime: &'static tokio::runtime::Runtime,
entered_runtime: EnterGuard<'static>,
harness: TenantHarness<'static>,
tenant: Arc<Tenant>,
tenant_ctx: RequestContext,
remote_fs_dir: PathBuf,
client: Arc<RemoteTimelineClient>,
}
impl TestSetup {
fn new(test_name: &str) -> anyhow::Result<Self> {
// Use a current-thread runtime in the test
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let entered_runtime = runtime.enter();
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = timeline.initialize(&ctx).unwrap();
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let client = Arc::new(RemoteTimelineClient {
conf: harness.conf,
runtime,
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
storage_impl: storage,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&harness.tenant_id,
&TIMELINE_ID,
)),
});
Ok(Self {
runtime,
entered_runtime,
harness,
tenant,
tenant_ctx: ctx,
remote_fs_dir,
client,
})
}
}
// Test scheduling
#[test]
fn upload_scheduling() -> anyhow::Result<()> {
// Use a current-thread runtime in the test
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let _entered = runtime.enter();
let harness = TenantHarness::create("upload_scheduling")?;
let (tenant, ctx) = runtime.block_on(harness.load());
let _timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
// Test outline:
//
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
@@ -1160,20 +1081,22 @@ mod tests {
// Schedule another deletion. Check that it's launched immediately.
// Schedule index upload. Check that it's queued
let TestSetup {
runtime,
entered_runtime: _entered_runtime,
harness,
tenant: _tenant,
tenant_ctx: _tenant_ctx,
remote_fs_dir,
client,
} = TestSetup::new("upload_scheduling").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
println!("workdir: {}", harness.conf.workdir.display());
let storage_impl = GenericRemoteStorage::from_config(&storage_config)?;
let client = Arc::new(RemoteTimelineClient {
conf: harness.conf,
runtime,
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
storage_impl,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&harness.tenant_id,
&TIMELINE_ID,
)),
});
let remote_timeline_dir =
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
println!("remote_timeline_dir: {}", remote_timeline_dir.display());
@@ -1293,90 +1216,4 @@ mod tests {
Ok(())
}
#[test]
fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> {
// Setup
let TestSetup {
runtime,
harness,
client,
..
} = TestSetup::new("metrics")?;
let metadata = dummy_metadata(Lsn(0x10));
client.init_upload_queue_for_empty_remote(&metadata)?;
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let content_1 = dummy_contents("foo");
std::fs::write(
timeline_path.join(layer_file_name_1.file_name()),
&content_1,
)?;
#[derive(Debug, PartialEq)]
struct BytesStartedFinished {
started: Option<usize>,
finished: Option<usize>,
}
let get_bytes_started_stopped = || {
let started = client
.metrics
.get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
.map(|v| v.try_into().unwrap());
let stopped = client
.metrics
.get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
.map(|v| v.try_into().unwrap());
BytesStartedFinished {
started,
finished: stopped,
}
};
// Test
let init = get_bytes_started_stopped();
client.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64),
)?;
let pre = get_bytes_started_stopped();
runtime.block_on(client.wait_completion())?;
let post = get_bytes_started_stopped();
// Validate
assert_eq!(
init,
BytesStartedFinished {
started: None,
finished: None
}
);
assert_eq!(
pre,
BytesStartedFinished {
started: Some(content_1.len()),
// assert that the _finished metric is created eagerly so that subtractions work on first sample
finished: Some(0),
}
);
assert_eq!(
post,
BytesStartedFinished {
started: Some(content_1.len()),
finished: Some(content_1.len())
}
);
Ok(())
}
}

View File

@@ -74,7 +74,7 @@ pub(super) async fn upload_timeline_layer<'a>(
})?;
storage
.upload(source_file, fs_size, &storage_path, None)
.upload(Box::new(source_file), fs_size, &storage_path, None)
.await
.with_context(|| {
format!(

View File

@@ -77,7 +77,6 @@ pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -146,7 +145,7 @@ pub struct Timeline {
// 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
// first WAL record when the node is started up. But here, we just
// keep track of it.
last_record_lsn: SeqWait<RecordLsn, Lsn>,
last_record_lsn: SeqWait<RecordLsn, Lsn, ()>,
// All WAL records have been processed and stored durably on files on
// local disk, up to this LSN. On crash and restart, we need to re-process
@@ -162,7 +161,7 @@ pub struct Timeline {
ancestor_timeline: Option<Arc<Timeline>>,
ancestor_lsn: Lsn,
pub(super) metrics: TimelineMetrics,
metrics: TimelineMetrics,
/// Ensures layers aren't frozen by checkpointer between
/// [`Timeline::get_layer_for_write`] and layer reads.
@@ -1137,8 +1136,6 @@ impl Timeline {
if let Some(delta) = local_layer_residence_duration {
self.metrics
.evictions_with_low_residence_duration
.read()
.unwrap()
.observe(delta);
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
@@ -1212,35 +1209,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
}
fn get_evictions_low_residence_duration_metric_threshold(
tenant_conf: &TenantConfOpt,
default_tenant_conf: &TenantConf,
) -> Duration {
tenant_conf
.evictions_low_residence_duration_metric_threshold
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
// The threshold is embedded in the metric. So, we need to update it.
{
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
&self.tenant_conf.read().unwrap(),
&self.conf.default_tenant_conf,
);
let tenant_id_str = self.tenant_id.to_string();
let timeline_id_str = self.timeline_id.to_string();
self.metrics
.evictions_with_low_residence_duration
.write()
.unwrap()
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
}
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.
@@ -1272,11 +1240,6 @@ impl Timeline {
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
let evictions_low_residence_duration_metric_threshold =
Self::get_evictions_low_residence_duration_metric_threshold(
&tenant_conf_guard,
&conf.default_tenant_conf,
);
drop(tenant_conf_guard);
Arc::new_cyclic(|myself| {
@@ -1307,10 +1270,13 @@ impl Timeline {
remote_client: remote_client.map(Arc::new),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
}),
last_record_lsn: SeqWait::new(
RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
},
(),
),
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
@@ -1324,7 +1290,7 @@ impl Timeline {
&timeline_id,
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
"mtime",
evictions_low_residence_duration_metric_threshold,
conf.evictions_low_residence_duration_metric_threshold,
),
),
@@ -2457,7 +2423,7 @@ impl Timeline {
assert!(new_lsn.is_aligned());
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
self.last_record_lsn.advance(new_lsn);
self.last_record_lsn.advance(new_lsn, None);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {

View File

@@ -27,8 +27,6 @@ use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task::spawn_blocking;
use tokio::time::Duration;
use tokio::time::Instant;
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
@@ -208,10 +206,6 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
}
}
// Send keepalive messages to walproposer, to make sure it receives updates
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
struct WalAcceptor {
tli: Arc<Timeline>,
@@ -259,25 +253,18 @@ impl WalAcceptor {
timeline: Arc::clone(&self.tli),
};
// After this timestamp we will stop processing AppendRequests and send a response
// to the walproposer. walproposer sends at least one AppendRequest per second,
// we will send keepalives by replying to these requests once per second.
let mut next_keepalive = Instant::now();
let mut next_msg: ProposerAcceptorMessage;
loop {
let opt_msg = self.msg_rx.recv().await;
if opt_msg.is_none() {
return Ok(()); // chan closed, streaming terminated
}
let mut next_msg = opt_msg.unwrap();
next_msg = opt_msg.unwrap();
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
// loop through AppendRequest's while it's readily available to
// write as many WAL as possible without fsyncing
//
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
// Otherwise, we might end up in a situation where we read a message, but don't
// process it.
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
@@ -287,11 +274,6 @@ impl WalAcceptor {
}
}
// get out of this loop if keepalive time is reached
if Instant::now() >= next_keepalive {
break;
}
match self.msg_rx.try_recv() {
Ok(msg) => next_msg = msg,
Err(TryRecvError::Empty) => break,
@@ -300,18 +282,18 @@ impl WalAcceptor {
}
// flush all written WAL to the disk
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?
if let Some(reply) = self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
}
} else {
// process message other than AppendRequest
self.tli.process_msg(&next_msg)?
};
if let Some(reply) = reply_msg {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
if let Some(reply) = self.tli.process_msg(&next_msg)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
}
// reset keepalive time
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
}
}
}

View File

@@ -23,6 +23,7 @@ use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
@@ -373,7 +374,7 @@ impl BrokerService for Broker {
Ok(info) => yield info,
Err(RecvError::Lagged(skipped_msg)) => {
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
missed_msgs = 0;

View File

@@ -45,8 +45,6 @@ PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
"pageserver_remote_physical_size",
"pageserver_remote_timeline_client_bytes_started_total",
"pageserver_remote_timeline_client_bytes_finished_total",
)
PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
@@ -55,7 +53,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_storage_operations_seconds_global_bucket",
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (

View File

@@ -1913,26 +1913,15 @@ def remote_pg(
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
host = parse_dsn(connstr).get("host", "")
is_neon = host.endswith(".neon.build")
start_ms = int(datetime.utcnow().timestamp() * 1000)
with RemotePostgres(pg_bin, connstr) as remote_pg:
if is_neon:
timeline_id = TimelineId(remote_pg.safe_psql("SHOW neon.timeline_id")[0][0])
yield remote_pg
end_ms = int(datetime.utcnow().timestamp() * 1000)
if is_neon:
host = parse_dsn(connstr).get("host", "")
if host.endswith(".neon.build"):
# Add 10s margin to the start and end times
allure_add_grafana_links(
host,
timeline_id,
start_ms - 10_000,
end_ms + 10_000,
)
allure_add_grafana_links(host, start_ms - 10_000, end_ms + 10_000)
class PSQL:

View File

@@ -519,13 +519,6 @@ class PageserverHttpClient(requests.Session):
assert res.status_code == 200
def download_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)
for layer in info.historic_layers:
if not layer.remote:
continue
self.download_layer(tenant_id, timeline_id, layer.layer_file_name)
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
@@ -550,13 +543,3 @@ class PageserverHttpClient(requests.Session):
def tenant_break(self, tenant_id: TenantId):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)
def post_tracing_event(self, level: str, message: str):
res = self.post(
f"http://localhost:{self.port}/v1/tracing/event",
json={
"level": level,
"message": message,
},
)
self.verbose_error(res)

View File

@@ -13,7 +13,6 @@ import allure
from psycopg2.extensions import cursor
from fixtures.log_helper import log
from fixtures.types import TimelineId
Fn = TypeVar("Fn", bound=Callable[..., Any])
@@ -187,15 +186,11 @@ def allure_attach_from_dir(dir: Path):
allure.attach.file(source, name, attachment_type, extension)
GRAFANA_URL = "https://neonprod.grafana.net"
GRAFANA_EXPLORE_URL = f"{GRAFANA_URL}/explore"
GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL = f"{GRAFANA_URL}/d/8G011dlnk/timeline-inspector"
LOGS_STAGING_DATASOURCE_ID = "xHHYY0dVz"
DATASOURCE_ID = "xHHYY0dVz"
def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int, end_ms: int):
def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
"""Add links to server logs in Grafana to Allure report"""
links = {}
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
endpoint_id, region_id, _ = host.split(".", 2)
@@ -207,12 +202,12 @@ def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int,
}
params: Dict[str, Any] = {
"datasource": LOGS_STAGING_DATASOURCE_ID,
"datasource": DATASOURCE_ID,
"queries": [
{
"expr": "<PUT AN EXPRESSION HERE>",
"refId": "A",
"datasource": {"type": "loki", "uid": LOGS_STAGING_DATASOURCE_ID},
"datasource": {"type": "loki", "uid": DATASOURCE_ID},
"editorMode": "code",
"queryType": "range",
}
@@ -225,23 +220,8 @@ def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int,
for name, expr in expressions.items():
params["queries"][0]["expr"] = expr
query_string = urlencode({"orgId": 1, "left": json.dumps(params)})
links[name] = f"{GRAFANA_EXPLORE_URL}?{query_string}"
link = f"https://neonprod.grafana.net/explore?{query_string}"
timeline_qs = urlencode(
{
"orgId": 1,
"var-environment": "victoria-metrics-aws-dev",
"var-timeline_id": timeline_id,
"var-endpoint_id": endpoint_id,
"var-log_datasource": "grafanacloud-neonstaging-logs",
"from": start_ms,
"to": end_ms,
}
)
link = f"{GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL}?{timeline_qs}"
links["Timeline Inspector"] = link
for name, link in links.items():
allure.dynamic.link(link, name=name)
log.info(f"{name}: {link}")

View File

@@ -1,49 +0,0 @@
import uuid
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import wait_until
@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "error"])
def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str):
# self-test: make sure the event is logged (i.e., our testing endpoint works)
log_expected = {
"trace": False,
"debug": False,
"info": True,
"warn": True,
"error": True,
}[level]
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
msg_id = uuid.uuid4().hex
# NB: the _total suffix is added by our prometheus client
before = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
# post the event
ps_http.post_tracing_event(level, msg_id)
if log_expected:
env.pageserver.allowed_errors.append(f".*{msg_id}.*")
def assert_logged():
if not log_expected:
return
assert env.pageserver.log_contains(f".*{msg_id}.*")
wait_until(10, 0.5, assert_logged)
# make sure it's counted
def assert_metric_value():
if not log_expected:
return
# NB: the _total suffix is added by our prometheus client
val = ps_http.get_metric_value("libmetrics_tracing_event_count_total", {"level": level})
val = val or 0.0
log.info("libmetrics_tracing_event_count: %s", val)
assert val > (before or 0.0)
wait_until(10, 1, assert_metric_value)

View File

@@ -1,4 +1,3 @@
import json
from contextlib import closing
import psycopg2.extras
@@ -19,12 +18,7 @@ def test_tenant_config(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = """
page_cache_size=444;
wait_lsn_timeout='111 s';
[tenant_config]
checkpoint_distance = 10000
compaction_target_size = 1048576
evictions_low_residence_duration_metric_threshold = "2 days"
eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = "23 hours" }
"""
tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
env = neon_env_builder.init_start()
http_client = env.pageserver.http_client()
@@ -45,8 +39,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
new_conf = {
"checkpoint_distance": "20000",
"gc_period": "30sec",
"evictions_low_residence_duration_metric_threshold": "42s",
"eviction_policy": json.dumps({"kind": "NoEviction"}),
}
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
@@ -86,12 +78,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert effective_config["gc_period"] == "1h"
assert effective_config["image_creation_threshold"] == 3
assert effective_config["pitr_interval"] == "7days"
assert effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
assert effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
}
# check the configuration of the new tenant
with closing(env.pageserver.connect()) as psconn:
@@ -126,12 +112,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert (
new_effective_config["gc_period"] == "30s"
), "Specific 'gc_period' config should override the default value"
assert (
new_effective_config["evictions_low_residence_duration_metric_threshold"] == "42s"
), "Should override default value"
assert new_effective_config["eviction_policy"] == {
"kind": "NoEviction"
}, "Specific 'eviction_policy' config should override the default value"
assert new_effective_config["compaction_target_size"] == 1048576
assert new_effective_config["compaction_period"] == "20s"
assert new_effective_config["compaction_threshold"] == 10
@@ -145,10 +125,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
"gc_period": "80sec",
"compaction_period": "80sec",
"image_creation_threshold": "2",
"evictions_low_residence_duration_metric_threshold": "23h",
"eviction_policy": json.dumps(
{"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"}
),
}
env.neon_cli.config_tenant(
tenant_id=tenant,
@@ -191,14 +167,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert (
updated_effective_config["compaction_period"] == "1m 20s"
), "Specific 'compaction_period' config should override the default value"
assert (
updated_effective_config["evictions_low_residence_duration_metric_threshold"] == "23h"
), "Should override default value"
assert updated_effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "1m 20s",
"threshold": "1day 18h",
}, "Specific 'eviction_policy' config should override the default value"
assert updated_effective_config["compaction_target_size"] == 1048576
assert updated_effective_config["compaction_threshold"] == 10
assert updated_effective_config["gc_horizon"] == 67108864
@@ -257,12 +225,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert final_effective_config["gc_horizon"] == 67108864
assert final_effective_config["gc_period"] == "1h"
assert final_effective_config["image_creation_threshold"] == 3
assert final_effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
assert final_effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
}
# restart the pageserver and ensure that the config is still correct
env.pageserver.stop()
@@ -323,81 +285,3 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
# dont test applying the setting here, we have that another test case to show it
# we just care about being able to create the file
assert len(contents_first) > len(contents_later)
def test_live_reconfig_get_evictions_low_residence_duration_metric_threshold(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_live_reconfig_get_evictions_low_residence_duration_metric_threshold",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
ps_http = env.pageserver.http_client()
def get_metric():
metrics = ps_http.get_metrics()
metric = metrics.query_one(
"pageserver_evictions_with_low_residence_duration_total",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
return metric
default_value = ps_http.tenant_config(tenant_id).effective_config[
"evictions_low_residence_duration_metric_threshold"
]
metric = get_metric()
assert int(metric.value) == 0, "metric is present with default value"
assert default_value == "1day"
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.value) > 0, "metric is updated"
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": default_value}
)
updated_metric = get_metric()
assert int(updated_metric.value) == int(
metric.value
), "metric is unchanged when setting same value"
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": "2day"}
)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 24 * 60 * 60
assert int(metric.value) == 0
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 24 * 60 * 60
assert int(metric.value) > 0
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": "2h"}
)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 60 * 60
assert int(metric.value) == 0, "value resets if label changes"
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 60 * 60
assert int(metric.value) > 0, "set a non-zero value for next step"
env.neon_cli.config_tenant(tenant_id, {})
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 24 * 60 * 60, "label resets to default"
assert int(metric.value) == 0, "value resets to default"

View File

@@ -4,6 +4,8 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap.workspace = true
anyhow.workspace = true

View File

@@ -18,7 +18,6 @@ byteorder = { version = "1" }
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
@@ -30,6 +29,7 @@ futures-executor = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -52,8 +52,7 @@ socket2 = { version = "0.4", default-features = false, features = ["all"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "sync", "time"] }
tokio-rustls = { version = "0.23" }
tokio-util = { version = "0.7", features = ["codec", "io"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
toml_edit = { version = "0.19", features = ["serde"] }
tonic = { version = "0.8", features = ["tls-roots"] }
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
@@ -65,6 +64,7 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
either = { version = "1" }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -74,7 +74,6 @@ prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.6" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }
syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
### END HAKARI SECTION