Compare commits

..

20 Commits

Author SHA1 Message Date
Christian Schwarz
81d715b187 another rename 2023-06-07 16:27:53 +02:00
Christian Schwarz
0afd20068b title 2023-06-07 16:27:53 +02:00
Christian Schwarz
f3d7bf9e09 rename the crate and fix some compile errors that I missed a while back 2023-06-07 16:27:53 +02:00
Christian Schwarz
748c06cff8 docs improvements 2023-06-07 16:27:53 +02:00
Christian Schwarz
0d82862d55 generic GetReconstructPathError 2023-06-07 16:27:53 +02:00
Christian Schwarz
f2bd71d0a8 more docs 2023-06-07 16:27:53 +02:00
Christian Schwarz
de9521214d all-in on immutable + better crate comment 2023-06-07 16:27:53 +02:00
Christian Schwarz
8d9207040f WIP doc comments (more aspirational than reality) 2023-06-07 16:27:53 +02:00
Christian Schwarz
8e57d95026 completely immutable variant of the design 2023-06-07 16:27:53 +02:00
Christian Schwarz
6c71fc6646 struct type for in-memory layer put failure 2023-06-07 16:27:53 +02:00
Christian Schwarz
e6a36b5236 unused PutError accessors 2023-06-07 16:27:53 +02:00
Christian Schwarz
56f57172dd move code around to prep for alternative impl 2023-06-07 16:27:53 +02:00
Christian Schwarz
74ad719ede fix most unused variable warnings 2023-06-07 16:27:53 +02:00
Christian Schwarz
5570384672 WIP 2023-06-07 16:27:53 +02:00
Christian Schwarz
321e74b5ee implement support for non-blocking SeqWait::wait_for_timeout and use it 2023-06-07 16:27:53 +02:00
Christian Schwarz
712a516a2f switch to a Types trait to declutter the generics 2023-06-07 16:27:53 +02:00
Christian Schwarz
be5ba04dca rename trait to HistoricStuff 2023-06-07 16:27:53 +02:00
Christian Schwarz
b4b1292e15 WIP version of fully generic layer map built atop the new seqwait 2023-06-07 16:27:53 +02:00
Christian Schwarz
9b992c621d seqwait: handle wait_for_timeout() with a zero duration efficiently 2023-06-07 16:27:49 +02:00
Christian Schwarz
1fa17ed486 seqwait: add support for communicating immutable data between advance and wait
The long-term plan is to make LayerMap an immutable data structure that is
multi-versioned. Meaning, we will not modify LayerMap in place but create
a (cheap) copy and modify that copy. Once we're done making modifications,
we make the copy available to readers through the SeqWait.

The modifications will be made by a _single_ task, the pageserver actor.
But _many_ readers can wait_for & use same or multiple versions of the LayerMap.

So, there's a new method `split_spmc` that splits up a `SeqWait` into a
not-clonable producer (Advance) and a clonable consumer (Wait).

(SeqWait itself is mpmc, but, for the actor architecture, it makes sense
 to enforce spmc in the type system)

 # Please enter the commit message for your changes. Lines starting
2023-06-07 16:26:48 +02:00
53 changed files with 1533 additions and 1631 deletions

View File

@@ -4,7 +4,7 @@
hakari-package = "workspace_hack"
# Format for `workspace-hack = ...` lines in other Cargo.tomls. Requires cargo-hakari 0.9.8 or above.
dep-format-version = "4"
dep-format-version = "3"
# Setting workspace.resolver = "2" in the root Cargo.toml is HIGHLY recommended.
# Hakari works much better with the new feature resolver.

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -1,50 +0,0 @@
storage:
vars:
bucket_name: neon-prod-storage-us-east-1
bucket_region: us-east-1
console_mgmt_base_url: http://neon-internal-api.aws.neon.tech
broker_endpoint: http://storage-broker-lb.theta.us-east-1.internal.aws.neon.tech:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-east-1
ansible_aws_ssm_bucket_name: neon-prod-storage-us-east-1
console_region_id: aws-us-east-1
sentry_environment: production
children:
pageservers:
hosts:
pageserver-0.us-east-1.aws.neon.tech:
ansible_host: i-0f58137883429f55a
pageserver-1.us-east-1.aws.neon.tech:
ansible_host: i-08e7ee6190a099019
pageserver-2.us-east-1.aws.neon.tech:
ansible_host: i-0686a4e5e208e31a1
safekeepers:
hosts:
safekeeper-0.us-east-1.aws.neon.tech:
ansible_host: i-04ce739e88793d864
safekeeper-1.us-east-1.aws.neon.tech:
ansible_host: i-0e9e6c9227fb81410
safekeeper-2.us-east-1.aws.neon.tech:
ansible_host: i-072f4dd86a327d52f

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
@@ -34,7 +34,7 @@ storage:
pageservers:
hosts:
pageserver-0.us-west-2.aws.neon.tech:
ansible_host: i-0d9f6dfae0e1c780d
ansible_host: i-0d9f6dfae0e1c780d
pageserver-1.us-west-2.aws.neon.tech:
ansible_host: i-0c834be1dddba8b3f
pageserver-2.us-west-2.aws.neon.tech:
@@ -49,5 +49,5 @@ storage:
safekeeper-1.us-west-2.aws.neon.tech:
ansible_host: i-074682f9d3c712e7c
safekeeper-2.us-west-2.aws.neon.tech:
ansible_host: i-042b7efb1729d7966
ansible_host: i-042b7efb1729d7966

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:

View File

@@ -1,22 +1,6 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -7,16 +7,15 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -1,69 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-1.aws.neon.tech"
# These domains haven't been delegated yet.
# extraDomains: ["*.us-east-1.retooldb.com", "*.us-east-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"
metricCollectionInterval: "10min"
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-east-1
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-east-1.aws.neon.tech
httpsPort: 443
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-proxy.fullname\" . }}"
labels:
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
app.kubernetes.io/name: neon-proxy
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-proxy"
endpoints:
- port: http
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"

View File

@@ -1,52 +0,0 @@
# Helm chart values for neon-storage-broker
podLabels:
neon_env: production
neon_service: storage-broker
# Use L4 LB
service:
# service.annotations -- Annotations to add to the service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external # use newer AWS Load Balancer Controller
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal # deploy LB to private subnet
# assign service to this name at external-dns
external-dns.alpha.kubernetes.io/hostname: storage-broker-lb.theta.us-east-1.internal.aws.neon.tech
# service.type -- Service type
type: LoadBalancer
# service.port -- broker listen port
port: 50051
ingress:
enabled: false
metrics:
enabled: false
extraManifests:
- apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
name: "{{ include \"neon-storage-broker.fullname\" . }}"
labels:
helm.sh/chart: neon-storage-broker-{{ .Chart.Version }}
app.kubernetes.io/name: neon-storage-broker
app.kubernetes.io/instance: neon-storage-broker
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
app.kubernetes.io/managed-by: Helm
namespace: "{{ .Release.Namespace }}"
spec:
selector:
matchLabels:
app.kubernetes.io/name: "neon-storage-broker"
endpoints:
- port: broker
path: /metrics
interval: 10s
scrapeTimeout: 10s
namespaceSelector:
matchNames:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "production"

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800

View File

@@ -49,7 +49,7 @@ jobs:
shell: bash
strategy:
matrix:
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1, us-east-1 ]
target_region: [ us-east-2, us-west-2, eu-central-1, ap-southeast-1 ]
environment:
name: prod-${{ matrix.target_region }}
steps:
@@ -97,10 +97,6 @@ jobs:
target_cluster: prod-ap-southeast-1-epsilon
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
- target_region: us-east-1
target_cluster: prod-us-east-1-theta
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
environment:
name: prod-${{ matrix.target_region }}
steps:

1409
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,10 +24,10 @@ atty = "0.2.14"
aws-config = { version = "0.51.0", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.21.0"
aws-smithy-http = "0.51.0"
aws-types = "0.55"
aws-types = "0.51.0"
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.65"
bindgen = "0.61"
bstr = "1.0"
byteorder = "1.4"
bytes = "1.0"
@@ -50,7 +50,7 @@ git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hex = "0.4"
hex-literal = "0.4"
hex-literal = "0.3"
hmac = "0.12.1"
hostname = "0.3.1"
humantime = "2.1"
@@ -80,18 +80,18 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
routerify = "3"
rpds = "0.13"
rpds = "0.12.0"
rustls = "0.20"
rustls-pemfile = "1"
rustls-split = "0.3"
scopeguard = "1.1"
sentry = { version = "0.30", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
sentry = { version = "0.29", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
sha2 = "0.10.2"
signal-hook = "0.3"
socket2 = "0.5"
socket2 = "0.4.4"
strum = "0.24"
strum_macros = "0.24"
svg_fmt = "0.4.1"
@@ -106,17 +106,17 @@ tokio-postgres-rustls = "0.9.0"
tokio-rustls = "0.23"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"] }
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
toml = "0.5"
toml_edit = { version = "0.17", features = ["easy"] }
tonic = {version = "0.8", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2"
uuid = { version = "1.2", features = ["v4", "serde"] }
walkdir = "2.3.2"
webpki-roots = "0.23"
x509-parser = "0.15"
webpki-roots = "0.22.5"
x509-parser = "0.14"
## TODO replace this with tracing
env_logger = "0.10"
@@ -154,9 +154,9 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
## Build dependencies
criterion = "0.4"
rcgen = "0.10"
rstest = "0.17"
rstest = "0.16"
tempfile = "3.4"
tonic-build = "0.9"
tonic-build = "0.8"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.

View File

@@ -12,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev
libicu-dev libxslt1-dev
#########################################################################################
#
@@ -24,13 +24,8 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION} postgres
RUN cd postgres && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION}" != "v14" ]; then \
# zstd is available only from PG15
export CONFIGURE_CMD="${CONFIGURE_CMD} --with-zstd"; \
fi && \
eval $CONFIGURE_CMD && \
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
--with-libxml --with-libxslt && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
@@ -570,17 +565,13 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)
# liblz4-1 for lz4
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
# libzstd1 for zstd
RUN apt update && \
apt install --no-install-recommends -y \
gdb \
locales \
libicu67 \
liblz4-1 \
libreadline8 \
libossp-uuid16 \
libgeos-c1v5 \
@@ -590,8 +581,7 @@ RUN apt update && \
libsfcgal1 \
libxml2 \
libxslt1.1 \
libzstd1 \
procps && \
gdb && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -1,28 +1,12 @@
use anyhow::{anyhow, Result};
use postgres::Client;
use tokio_postgres::NoTls;
use tracing::{error, instrument};
use crate::compute::ComputeNode;
/// Update timestamp in a row in a special service table to check
/// that we can actually write some data in this particular timeline.
/// Create table if it's missing.
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
// Connect to the database.
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
let query = "
CREATE TABLE IF NOT EXISTS health_check (
id serial primary key,
@@ -31,15 +15,31 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = now();";
let result = client.simple_query(query).await?;
if result.len() != 2 {
return Err(anyhow::format_err!(
"expected 2 query results, but got {}",
result.len()
));
let result = client.simple_query(query)?;
if result.len() < 2 {
return Err(anyhow::format_err!("executed {} queries", result.len()));
}
Ok(())
}
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
let result = client
.simple_query("UPDATE health_check SET updated_at = now() WHERE id = 1;")
.await?;
if result.len() != 1 {
return Err(anyhow!("statement can't be executed"));
}
Ok(())
}

View File

@@ -32,6 +32,7 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
use crate::checker::create_writability_check_data;
use crate::config;
use crate::pg_helpers::*;
use crate::spec::*;
@@ -341,6 +342,7 @@ impl ComputeNode {
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(spec, &mut client)?;
// 'Close' connection

View File

@@ -85,10 +85,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
let res = crate::checker::check_writability(compute).await;
match res {
Ok(_) => Response::new(Body::from("true")),
Err(e) => {
error!("check_writability failed: {}", e);
Response::new(Body::from(e.to_string()))
}
Err(e) => Response::new(Body::from(e.to_string())),
}
}

View File

@@ -368,9 +368,6 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -448,9 +445,6 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as an integer")?,
evictions_low_residence_duration_metric_threshold: settings
.get("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
})
.send()?
.error_from_body()?;

View File

@@ -4,12 +4,13 @@ version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[dependencies]
anyhow.workspace = true
chrono.workspace = true
rand.workspace = true
serde.workspace = true
serde_with.workspace = true
utils.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
workspace_hack.workspace = true
[dependencies]
anyhow = "1.0.68"
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
rand = "0.8.3"
serde = "1.0.152"
serde_with = "2.1.0"
utils = { version = "0.1.0", path = "../utils" }
workspace_hack = { version = "0.1.0", path = "../../workspace_hack" }

View File

@@ -135,7 +135,6 @@ pub struct TenantCreateRequest {
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
#[serde_as]
@@ -182,7 +181,6 @@ pub struct TenantConfigRequest {
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
impl TenantConfigRequest {
@@ -204,7 +202,6 @@ impl TenantConfigRequest {
trace_read_requests: None,
eviction_policy: None,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
}
}
}

View File

@@ -5,7 +5,7 @@ use std::path::PathBuf;
use std::process::Command;
use anyhow::{anyhow, Context};
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
use bindgen::callbacks::ParseCallbacks;
#[derive(Debug)]
struct PostgresFfiCallbacks;
@@ -20,7 +20,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
// Add any custom #[derive] attributes to the data structures that bindgen
// creates.
fn add_derives(&self, derive_info: &DeriveInfo) -> Vec<String> {
fn add_derives(&self, name: &str) -> Vec<String> {
// This is the list of data structures that we want to serialize/deserialize.
let serde_list = [
"XLogRecord",
@@ -31,7 +31,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
"ControlFileData",
];
if serde_list.contains(&derive_info.name) {
if serde_list.contains(&name) {
vec![
"Default".into(), // Default allows us to easily fill the padding fields with 0.
"Serialize".into(),

View File

@@ -204,7 +204,12 @@ async fn upload_s3_data(
let data = format!("remote blob data {i}").into_bytes();
let data_len = data.len();
task_client
.upload(std::io::Cursor::new(data), data_len, &blob_path, None)
.upload(
Box::new(std::io::Cursor::new(data)),
data_len,
&blob_path,
None,
)
.await?;
Ok::<_, anyhow::Error>((blob_prefix, blob_path))

View File

@@ -0,0 +1,13 @@
[package]
name = "timeline_data_path"
version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
utils.workspace = true
workspace_hack.workspace = true
tokio.workspace = true
thiserror.workspace = true

View File

@@ -0,0 +1,396 @@
//! The Timeline's core data path.
//!
//! # Overview
//!
//! This crate implements the core data path of a Timeline inside Pageserver:
//!
//! 1. WAL records from `walreceiver`, via in-memory layers, into persistent L0 layers.
//! 1. `GetPage@LSN`: retrieval of WAL records and page images for feeding into WAL redo.
//! 1. Data re-shuffeling through compaction (TODO).
//! 1. Page image creation & garbage collection through GC (TODO).
//!
//! This crate assumes the following concepts, but is fully generic over their implementation:
//!
//! - **Delta Records**: data is written into the system in the form of self-descriptive deltas.
//! For the Pageserver use case, these deltas are derived from Postgres WAL records.
//! - **Page Numbers**: Delta Records always affect a single key.
//! That key is called page number, because, in the Pageserver use case, the Postgres table page numbers are the keys.
//! - **LSN**: When writing Delta Records into the system, they are associated with a monotonically increasing LSN.
//! Subsequently written Delta Records must have increasing LSNs.
//! - **Page Images**: Delta Records for a given page can be used to reconstruct the page. Think of it like squashing diffs.
//! - When sorting the Delta Records for a given key by their LSN, any prefix of that sorting can be squashed into a page image.
//! - Delta Records following such a squash can be squashed into that page image.
//! - In Pageserver, WAL redo implements the (pure) function of squashing.
//! - **In-Memory Layer**: an object that represents an "unfinished" L0 layer file, holding Delta Records in insertion order.
//! "Unfinished" means that we're still writing Delta Records to that file.
//! - **Historic Layer**: an object that represents a "finished" layer file, at any compaction level.
//! Such objects reside on disk and/or in remote storage.
//! They may contain Delta Records, Page Images, or a mixture thereof. It doesn't matter.
//! - **HistoricStuff**: an efficient lookup data structure to find the list of Historic Layer objects
//! that hold the Delta Records / PageImages required to reconstruct a Page Image at a given LSN.
//!
//! # API
//!
//! The core idea is that of a specialized single-producer multi-consumer structure,
//! embodied by a Read-end and a Write-end.
//!
//! The Write-end is used to push new `DeltaRecord @ LSN`s into the system.
//! In Pageserver, this is used by the `WalReceiver`.
//!
//! The Read-end provides the `GetPage@LSN` API.
//! In the current iteration, we actually return something called `ReconstructWork`.
//! I.e., we leave the work of reading the values from the layers, and the WAL redo invocation to the caller.
//! Find rationale for this design in the *Scope* section.
//!
//! ## Immutability
//!
//! The traits defined by this crate assume immutable data structures that are multi-versioned.
//!
//! As an example for what "immutable" means, take the case where we add a new Historic Layer to HistoricStuff.
//! Traditionally, one would use shared mutable state, i.e. `Arc<RwLock<...>>`.
//! To insert the new Historic Layer, we would acquire the RwLock in write mode and modify a lookup data structure to accomodate the new layer.
//! The Read-ends would use RwLock in read mode to read from the data structure.
//!
//! Conversely, with *immutable data structures*, writers create new version (aka *snapshots*) of the lookup data structure.
//! New reads on the Read-ends will use the new snapshot, but old ongoing reads would use the old version(s).
//! An efficient implementation would likely share the Historic Layer objects, e.g., using `Arc`.
//! And maybe there's internally mutable state inside the layer objects, e.g., to track residence (i.e., *on-demand downloaded* vs *evicted*).
//! But the important point is that there's no synchronization / lock-holding at any higher level, except when grabbing a reference to the snapshot (Read-end), or when publishing a new snapshot (Write-end).
//!
//! ## Scope
//!
//! The following concerns are considered implementation details from the perspective of this crate:
//!
//! - **Layer File Persistence**: `HistoricStuff::make_historic` is responsible for this.
//! - **Reading Layer Files**: the `ReconstructWork` that the Read-end returns from `GetPage@LSN` requests contains the list of layers to consult.
//! The crate consumer is responsible for reading the layers & doing WAL redo.
//! Likely the implementation of `HistoricStuff` plays a role here, because it is responsible for persisting the layer files.
//! - **Layer Eviction & On-Demand Download**: this is just an aspect of the above.
//! The crate consumer can choose to implement eviction & on-demand download however they wish.
//! The only requirement is that the Historic Layers don't change their contents, i.e., they always returnt he same reconstruct values for the same lookup.
//! - For example, a `LayerCache` modoule or service could take care of layer uploads, eviction, and on-demand downloads.
//! Initially, the `layer cache` can be local-only.
//! But in the future, it can be multi-machine / clustered pagesevers / aka "sharding".
//!
//! # Example
//!
//! The [`new`] function is the entrypoint to this crate.
//!
//! See the test cases for how it is used.
use std::{marker::PhantomData, time::Duration};
use utils::seqwait::{self, Advance, SeqWait, Wait};
#[cfg(test)]
mod tests;
/// Collection of types / type bounds used by Read-end and Write-end.
///
/// See the [`crate`]-level docs's *Concepts* section to learn about
/// the meaning of each associated `type`.
///
/// # Usage
///
/// Define a zero-sized-type and impl this Trait for it.
/// Then use that zero-sized-type as the single generic argument to [`new`]
/// and almost all types declared in this crate.
///
/// It might feel a bit weird, but, the alternative is to have umpteen generic
/// types per `impl` with repetitive trait bounds.
///
/// Search the test cases for an example of how this can be used to improve testability.
pub trait Types {
type Key: Copy;
type Lsn: Ord + Copy;
type LsnCounter: seqwait::MonotonicCounter<Self::Lsn> + Copy;
type DeltaRecord;
type HistoricLayer;
type InMemoryLayer: InMemoryLayer<Types = Self> + Clone;
type HistoricStuff: HistoricStuff<Types = Self> + Clone;
type GetReconstructPathError: std::error::Error;
}
/// Error returned by [`InMemoryLayer::put`].
#[derive(thiserror::Error)]
pub struct InMemoryLayerPutError<DeltaRecord> {
delta: DeltaRecord,
kind: InMemoryLayerPutErrorKind,
}
/// Part of [`InMemoryLayerPutError`].
#[derive(Debug)]
pub enum InMemoryLayerPutErrorKind {
LayerFull,
AlreadyHaveRecordForKeyAndLsn,
}
impl<DeltaRecord> std::fmt::Debug for InMemoryLayerPutError<DeltaRecord> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerPutError")
// would require DeltaRecord to impl Debug
// .field("delta", &self.delta)
.field("kind", &self.kind)
.finish()
}
}
/// An in-memory layer. See [`crate`] docs for details on this concept.
pub trait InMemoryLayer: std::fmt::Debug + Default + Clone {
type Types: Types;
fn put(
&mut self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
delta: <Self::Types as Types>::DeltaRecord,
) -> Result<Self, InMemoryLayerPutError<<Self::Types as Types>::DeltaRecord>>;
fn get(
&self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
) -> Vec<<Self::Types as Types>::DeltaRecord>;
}
/// The manager of [`Types::HistoricLayer`]s.
pub trait HistoricStuff {
type Types: Types;
fn get_reconstruct_path(
&self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
) -> Result<
Vec<<Self::Types as Types>::HistoricLayer>,
<Self::Types as Types>::GetReconstructPathError,
>;
/// Produce a new version of `self` that includes the given inmem layer.
fn make_historic(&self, inmem: <Self::Types as Types>::InMemoryLayer) -> Self;
}
/// A snapshot of the data. See [`crate`]-level docs section on *immutability* for details.
struct Snapshot<T: Types> {
_types: PhantomData<T>,
inmem: Option<T::InMemoryLayer>,
historic: T::HistoricStuff,
}
impl<T: Types> Clone for Snapshot<T> {
fn clone(&self) -> Self {
Self {
_types: self._types.clone(),
inmem: self.inmem.clone(),
historic: self.historic.clone(),
}
}
}
/// The Read-end. See [`crate`]-level docs for details.
pub struct Reader<T: Types> {
wait: Wait<T::LsnCounter, T::Lsn, Snapshot<T>>,
}
/// The Write-end. See [`crate`]-level docs for details.
pub struct Writer<T: Types> {
advance: Advance<T::LsnCounter, T::Lsn, Snapshot<T>>,
}
/// Setup a pair of Read-end and Write-End. This is the entrypoint to this crate.
///
/// The idea is that the caller loads the arguments from persistent state that `HistoricStuff` wrote at an earlier point in time.
pub fn new<T: Types>(lsn: T::LsnCounter, historic: T::HistoricStuff) -> (Reader<T>, Writer<T>) {
let state = Snapshot {
_types: PhantomData::<T>::default(),
inmem: None,
historic: historic,
};
let (wait, advance) = SeqWait::new(lsn, state).split_spmc();
let reader = Reader { wait };
let read_writer = Writer { advance };
(reader, read_writer)
}
/// Error returned by the get-page operations.
#[derive(Debug, thiserror::Error)]
pub enum GetError<T: Types> {
#[error(transparent)]
SeqWait(seqwait::SeqWaitError),
#[error(transparent)]
GetReconstructPath(T::GetReconstructPathError),
}
/// Self-contained set of objects required to reconstruct a page image for the given `key` @ `lsn`.
///
/// This is returned by the `get` methods of [`Reader`] and [`Writer`].
///
/// To reconstruct the page image, stack up (top to bottom) `inmem_records` plus all records found for `key` and `lsn` along the `historic_path` until an initial page image is found.
/// Then feed that stack to WAL-redo to get the page image.
///
/// See [`crate`]-level docs on *scope* for why we don't return page images from these functions.
pub struct ReconstructWork<T: Types> {
pub key: T::Key,
pub lsn: T::Lsn,
pub inmem_records: Vec<T::DeltaRecord>,
pub historic_path: Vec<T::HistoricLayer>,
}
impl<T: Types> Reader<T> {
/// This is the `GetPage@LSN` operation.
///
/// See the [`crate`]-level docs for why we return [`ReconstructWork`] instead of a Page Image here.
pub async fn get(&self, key: T::Key, lsn: T::Lsn) -> Result<ReconstructWork<T>, GetError<T>> {
// XXX dedup with Writer::get_nowait
let state = self.wait.wait_for(lsn).await.map_err(GetError::SeqWait)?;
let inmem_records = state
.inmem
.as_ref()
.map(|iml| iml.get(key, lsn))
.unwrap_or_default();
let historic_path = state
.historic
.get_reconstruct_path(key, lsn)
.map_err(GetError::GetReconstructPath)?;
Ok(ReconstructWork {
key,
lsn,
inmem_records,
historic_path,
})
}
}
/// Error returned by the `put` operation.
#[derive(thiserror::Error)]
pub struct PutError<T: Types> {
/// The `delta` record which we failed to `put`.
pub delta: T::DeltaRecord,
/// Description of what went wrong.
pub kind: PutErrorKind,
}
/// Part of [`PutError`].
#[derive(Debug)]
pub enum PutErrorKind {
AlreadyHaveInMemoryRecordForKeyAndLsn,
}
impl<T: Types> std::fmt::Debug for PutError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PutError")
// would need to require Debug for DeltaRecord
// .field("delta", &self.delta)
.field("kind", &self.kind)
.finish()
}
}
impl<T: Types> Writer<T> {
/// Insert data into the system.
pub async fn put(
&mut self,
key: T::Key,
lsn: T::Lsn,
delta: T::DeltaRecord,
) -> Result<(), PutError<T>> {
let (_snapshot_lsn, snapshot) = self.advance.get_current_data();
// TODO ensure snapshot_lsn <= lsn?
let mut inmem = snapshot
.inmem
.unwrap_or_else(|| T::InMemoryLayer::default());
// XXX: use the Advance as witness and only allow witness to access inmem in write mode
match inmem.put(key, lsn, delta) {
Ok(new_inmem) => {
let new_snapshot = Snapshot {
_types: PhantomData,
inmem: Some(new_inmem),
historic: snapshot.historic,
};
self.advance.advance(lsn, Some(new_snapshot));
}
Err(InMemoryLayerPutError {
delta,
kind: InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
}) => {
return Err(PutError {
delta,
kind: PutErrorKind::AlreadyHaveInMemoryRecordForKeyAndLsn,
});
}
Err(InMemoryLayerPutError {
delta,
kind: InMemoryLayerPutErrorKind::LayerFull,
}) => {
let new_historic = snapshot.historic.make_historic(inmem);
let mut new_inmem = T::InMemoryLayer::default();
let new_inmem = new_inmem
.put(key, lsn, delta)
.expect("put into default inmem layer must not fail");
let new_state = Snapshot {
_types: PhantomData::<T>::default(),
inmem: Some(new_inmem),
historic: new_historic,
};
self.advance.advance(lsn, Some(new_state));
}
}
Ok(())
}
/// Force flushing of the current in-memory layer.
///
/// Usually, flushing happens only if the in-memory layer is full.
/// Use this API to make it happen in other circumstances (shutdown, periodic ticker, etc.).
pub async fn force_flush(&mut self) -> tokio::io::Result<()> {
let (snapshot_lsn, snapshot) = self.advance.get_current_data();
let Snapshot {
_types,
inmem,
historic,
} = snapshot;
// XXX: use the Advance as witness and only allow witness to access inmem in "write" mode
let Some(inmem) = inmem else {
// nothing to do
return Ok(());
};
let new_historic = historic.make_historic(inmem);
let new_snapshot = Snapshot {
_types: PhantomData::<T>::default(),
inmem: None,
historic: new_historic,
};
self.advance.advance(snapshot_lsn, Some(new_snapshot)); // TODO: should fail if we're past snapshot_lsn
Ok(())
}
/// `get` at the given LSN, without blocking.
///
/// Fails with a timeout error if the `lsn` isn't there yet.
/// That makes sense because the only way we'd stop waiting is by a `self.put()`.
/// But concurrent `put()` is forbidden.
pub async fn get_nowait(
&self,
key: T::Key,
lsn: T::Lsn,
) -> Result<ReconstructWork<T>, GetError<T>> {
// XXX dedup with Reader::get
let state = self
.advance
.wait_for_timeout(lsn, Duration::from_secs(0))
// The await is never going to block because we pass from_secs(0).
.await
.map_err(GetError::SeqWait)?;
let inmem_records = state
.inmem
.as_ref()
.map(|iml| iml.get(key, lsn))
.unwrap_or_default();
let historic_path = state
.historic
.get_reconstruct_path(key, lsn)
.map_err(GetError::GetReconstructPath)?;
Ok(ReconstructWork {
key,
lsn,
inmem_records,
historic_path,
})
}
}

View File

@@ -0,0 +1,170 @@
use std::collections::{btree_map::Entry, BTreeMap};
use std::sync::Arc;
use utils::seqwait;
/// The ZST for which we impl the `super::Types` type collection trait.
struct TestTypes;
impl super::Types for TestTypes {
type Key = usize;
type Lsn = usize;
type LsnCounter = UsizeCounter;
type DeltaRecord = &'static str;
type HistoricLayer = Arc<TestHistoricLayer>;
type InMemoryLayer = TestInMemoryLayer;
type HistoricStuff = TestHistoricStuff;
}
/// For testing, our in-memory layer is a simple hashmap.
#[derive(Clone, Default, Debug)]
struct TestInMemoryLayer {
by_key: BTreeMap<usize, BTreeMap<usize, &'static str>>,
}
/// For testing, our historic layers are just in-memory layer objects with `frozen==true`.
struct TestHistoricLayer(TestInMemoryLayer);
/// This is the data structure that impls the `HistoricStuff` trait.
#[derive(Default, Clone)]
struct TestHistoricStuff {
by_key: BTreeMap<usize, BTreeMap<usize, Arc<TestHistoricLayer>>>,
}
/// `seqwait::MonotonicCounter` impl
#[derive(Copy, Clone)]
pub struct UsizeCounter(usize);
// Our testing impl of HistoricStuff references the frozen InMemoryLayer objects
// from all the (key,lsn) entries that it covers.
// This mimics the (much more efficient) search tree in the real impl.
impl super::HistoricStuff for TestHistoricStuff {
type Types = TestTypes;
fn get_reconstruct_path(
&self,
key: usize,
lsn: usize,
) -> Result<Vec<Arc<TestHistoricLayer>>, super::GetReconstructPathError> {
let Some(bk) = self.by_key.get(&key) else {
return Ok(vec![]);
};
Ok(bk.range(..=lsn).rev().map(|(_, l)| Arc::clone(l)).collect())
}
fn make_historic(&self, inmem: TestInMemoryLayer) -> Self {
// For the purposes of testing, just turn the inmemory layer historic through the type system
let historic = Arc::new(TestHistoricLayer(inmem));
// Deep-copy
let mut copy = self.by_key.clone();
// Add the references to `inmem` to the deep-copied struct
for (k, v) in historic.0.by_key.iter() {
for (lsn, _deltas) in v.into_iter() {
let by_key = copy.entry(*k).or_default();
let overwritten = by_key.insert(*lsn, historic.clone());
assert!(matches!(overwritten, None), "layers must not overlap");
}
}
Self { by_key: copy }
}
}
impl super::InMemoryLayer for TestInMemoryLayer {
type Types = TestTypes;
fn put(
&mut self,
key: usize,
lsn: usize,
delta: &'static str,
) -> Result<Self, super::InMemoryLayerPutError<&'static str>> {
let mut clone = self.clone();
drop(self);
let by_key = clone.by_key.entry(key).or_default();
match by_key.entry(lsn) {
Entry::Occupied(_record) => {
return Err(super::InMemoryLayerPutError {
delta,
kind: super::InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
});
}
Entry::Vacant(vacant) => vacant.insert(delta),
};
Ok(clone)
}
fn get(&self, key: usize, lsn: usize) -> Vec<&'static str> {
let by_key = match self.by_key.get(&key) {
Some(by_key) => by_key,
None => return vec![],
};
by_key
.range(..=lsn)
.map(|(_, v)| v)
.rev()
.cloned()
.collect()
}
}
impl UsizeCounter {
pub fn new(inital: usize) -> Self {
UsizeCounter(inital)
}
}
impl seqwait::MonotonicCounter<usize> for UsizeCounter {
fn cnt_advance(&mut self, new_val: usize) {
assert!(self.0 < new_val);
self.0 = new_val;
}
fn cnt_value(&self) -> usize {
self.0
}
}
#[test]
fn basic() {
let lm = TestHistoricStuff::default();
let (r, mut rw) = super::new::<TestTypes>(UsizeCounter::new(0), lm);
let r = Arc::new(r);
let r2 = Arc::clone(&r);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let read_jh = rt.spawn(async move { r.get(0, 10).await });
let mut rw = rt.block_on(async move {
rw.put(0, 1, "foo").await.unwrap();
rw.put(1, 1, "bar").await.unwrap();
rw.put(0, 10, "baz").await.unwrap();
rw
});
let read_res = rt.block_on(read_jh).unwrap().unwrap();
assert!(
read_res.historic_path.is_empty(),
"we have pushed less than needed for flush"
);
assert_eq!(read_res.inmem_records, vec!["baz", "foo"]);
let rw = rt.block_on(async move {
rw.put(0, 11, "blup").await.unwrap();
rw
});
let read_res = rt.block_on(async move { r2.get(0, 11).await.unwrap() });
assert_eq!(read_res.historic_path.len(), 0);
assert_eq!(read_res.inmem_records, vec!["blup", "baz", "foo"]);
drop(rw);
}

View File

@@ -14,5 +14,4 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
workspace_hack.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -33,10 +33,11 @@ serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
url.workspace = true
uuid.workspace = true
uuid = { version = "1.2", features = ["v4", "serde"] }
metrics.workspace = true
workspace_hack.workspace = true
either.workspace = true
[dev-dependencies]
byteorder.workspace = true

View File

@@ -1,12 +1,13 @@
#![warn(missing_docs)]
use either::Either;
use std::cmp::{Eq, Ordering, PartialOrd};
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::mem;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::time::timeout;
/// An error happened while waiting for a number
@@ -36,45 +37,48 @@ pub trait MonotonicCounter<V> {
}
/// Internal components of a `SeqWait`
struct SeqWaitInt<S, V>
struct SeqWaitInt<S, V, T>
where
S: MonotonicCounter<V>,
V: Ord,
T: Clone,
{
waiters: BinaryHeap<Waiter<V>>,
waiters: BinaryHeap<Waiter<V, T>>,
current: S,
shutdown: bool,
data: T,
}
struct Waiter<T>
struct Waiter<V, T>
where
T: Ord,
V: Ord,
T: Clone,
{
wake_num: T, // wake me when this number arrives ...
wake_channel: Sender<()>, // ... by sending a message to this channel
wake_num: V, // wake me when this number arrives ...
wake_channel: Sender<T>, // ... by sending a message to this channel
}
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that.
impl<T: Ord> PartialOrd for Waiter<T> {
impl<V: Ord, T: Clone> PartialOrd for Waiter<V, T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.wake_num.partial_cmp(&self.wake_num)
}
}
impl<T: Ord> Ord for Waiter<T> {
impl<V: Ord, T: Clone> Ord for Waiter<V, T> {
fn cmp(&self, other: &Self) -> Ordering {
other.wake_num.cmp(&self.wake_num)
}
}
impl<T: Ord> PartialEq for Waiter<T> {
impl<V: Ord, T: Clone> PartialEq for Waiter<V, T> {
fn eq(&self, other: &Self) -> bool {
other.wake_num == self.wake_num
}
}
impl<T: Ord> Eq for Waiter<T> {}
impl<V: Ord, T: Clone> Eq for Waiter<V, T> {}
/// A tool for waiting on a sequence number
///
@@ -92,25 +96,28 @@ impl<T: Ord> Eq for Waiter<T> {}
///
/// <S> means Storage, <V> is type of counter that this storage exposes.
///
pub struct SeqWait<S, V>
pub struct SeqWait<S, V, T>
where
S: MonotonicCounter<V>,
V: Ord,
T: Clone,
{
internal: Mutex<SeqWaitInt<S, V>>,
internal: Mutex<SeqWaitInt<S, V, T>>,
}
impl<S, V> SeqWait<S, V>
impl<S, V, T> SeqWait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// Create a new `SeqWait`, initialized to a particular number
pub fn new(starting_num: S) -> Self {
pub fn new(starting_num: S, data: T) -> Self {
let internal = SeqWaitInt {
waiters: BinaryHeap::new(),
current: starting_num,
shutdown: false,
data,
};
SeqWait {
internal: Mutex::new(internal),
@@ -144,10 +151,13 @@ where
///
/// This call won't complete until someone has called `advance`
/// with a number greater than or equal to the one we're waiting for.
pub async fn wait_for(&self, num: V) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(mut rx)) => rx.changed().await.map_err(|_| SeqWaitError::Shutdown),
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
match self.queue_for_wait(num, false) {
Ok(Either::Left(data)) => Ok(data),
Ok(Either::Right(rx)) => match rx.await {
Err(_) => Err(SeqWaitError::Shutdown),
Ok(data) => Ok(data),
},
Err(e) => Err(e),
}
}
@@ -159,15 +169,18 @@ where
///
/// If that hasn't happened after the specified timeout duration,
/// [`SeqWaitError::Timeout`] will be returned.
///
/// Pass `timeout_duration.is_zero() == true` to guarantee that the
/// future that is this function will never await.
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(mut rx)) => match timeout(timeout_duration, rx.changed()).await {
Ok(Ok(())) => Ok(()),
) -> Result<T, SeqWaitError> {
match self.queue_for_wait(num, timeout_duration.is_zero()) {
Ok(Either::Left(data)) => Ok(data),
Ok(Either::Right(rx)) => match timeout(timeout_duration, rx).await {
Ok(Ok(data)) => Ok(data),
Ok(Err(_)) => Err(SeqWaitError::Shutdown),
Err(_) => Err(SeqWaitError::Timeout),
},
@@ -177,41 +190,50 @@ where
/// Register and return a channel that will be notified when a number arrives,
/// or None, if it has already arrived.
fn queue_for_wait(&self, num: V) -> Result<Option<Receiver<()>>, SeqWaitError> {
fn queue_for_wait(&self, num: V, nowait: bool) -> Result<Either<T, Receiver<T>>, SeqWaitError> {
let mut internal = self.internal.lock().unwrap();
if internal.current.cnt_value() >= num {
return Ok(None);
return Ok(Either::Left(internal.data.clone()));
}
if internal.shutdown {
return Err(SeqWaitError::Shutdown);
}
if nowait {
return Err(SeqWaitError::Timeout);
}
// Create a new channel.
let (tx, rx) = channel(());
let (tx, rx) = channel();
internal.waiters.push(Waiter {
wake_num: num,
wake_channel: tx,
});
// Drop the lock as we exit this scope.
Ok(Some(rx))
Ok(Either::Right(rx))
}
/// Announce a new number has arrived
///
/// All waiters at this value or below will be woken.
///
/// If `new_data` is Some(), it will update the internal data,
/// even if `num` is smaller than the internal counter.
/// It will not cause a wake-up though, in this case.
///
/// Returns the old number.
pub fn advance(&self, num: V) -> V {
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
let old_value;
let wake_these = {
let (wake_these, with_data) = {
let mut internal = self.internal.lock().unwrap();
if let Some(new_data) = new_data {
internal.data = new_data;
}
old_value = internal.current.cnt_value();
if old_value >= num {
return old_value;
}
internal.current.cnt_advance(num);
// Pop all waiters <= num from the heap. Collect them in a vector, and
// wake them up after releasing the lock.
let mut wake_these = Vec::new();
@@ -221,13 +243,13 @@ where
}
wake_these.push(internal.waiters.pop().unwrap().wake_channel);
}
wake_these
(wake_these, internal.data.clone())
};
for tx in wake_these {
// This can fail if there are no receivers.
// We don't care; discard the error.
let _ = tx.send(());
let _ = tx.send(with_data.clone());
}
old_value
}
@@ -236,6 +258,106 @@ where
pub fn load(&self) -> S {
self.internal.lock().unwrap().current
}
/// Split the seqwait into a part than can only do wait,
/// and another part that can do advance + wait.
///
/// The wait-only part can be cloned, the advance part cannot be cloned.
/// This provides a single-producer multi-consumer scheme.
pub fn split_spmc(self) -> (Wait<S, V, T>, Advance<S, V, T>) {
let inner = Arc::new(self);
let w = Wait {
inner: inner.clone(),
};
let a = Advance { inner };
(w, a)
}
}
/// See [`SeqWait::split_spmc`].
pub struct Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
inner: Arc<SeqWait<S, V, T>>,
}
/// See [`SeqWait::split_spmc`].
pub struct Advance<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
inner: Arc<SeqWait<S, V, T>>,
}
impl<S, V, T> Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// See [`SeqWait::wait_for`].
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
self.inner.wait_for(num).await
}
/// See [`SeqWait::wait_for_timeout`].
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
self.inner.wait_for_timeout(num, timeout_duration).await
}
}
impl<S, V, T> Advance<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// See [`SeqWait::advance`].
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
self.inner.advance(num, new_data)
}
/// See [`SeqWait::wait_for`].
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
self.inner.wait_for(num).await
}
/// See [`SeqWait::wait_for_timeout`].
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
self.inner.wait_for_timeout(num, timeout_duration).await
}
/// Get a `Clone::clone` of the current data inside the seqwait.
pub fn get_current_data(&self) -> (V, T) {
let inner = self.inner.internal.lock().unwrap();
(inner.current.cnt_value(), inner.data.clone())
}
}
impl<S, V, T> Clone for Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
@@ -256,12 +378,12 @@ mod tests {
#[tokio::test]
async fn seqwait() {
let seq = Arc::new(SeqWait::new(0));
let seq = Arc::new(SeqWait::new(0, ()));
let seq2 = Arc::clone(&seq);
let seq3 = Arc::clone(&seq);
let jh1 = tokio::task::spawn(async move {
seq2.wait_for(42).await.expect("wait_for 42");
let old = seq2.advance(100);
let old = seq2.advance(100, None);
assert_eq!(old, 99);
seq2.wait_for_timeout(999, Duration::from_millis(100))
.await
@@ -272,12 +394,12 @@ mod tests {
seq3.wait_for(0).await.expect("wait_for 0");
});
tokio::time::sleep(Duration::from_millis(200)).await;
let old = seq.advance(99);
let old = seq.advance(99, None);
assert_eq!(old, 0);
seq.wait_for(100).await.expect("wait_for 100");
// Calling advance with a smaller value is a no-op
assert_eq!(seq.advance(98), 100);
assert_eq!(seq.advance(98, None), 100);
assert_eq!(seq.load(), 100);
jh1.await.unwrap();
@@ -288,7 +410,7 @@ mod tests {
#[tokio::test]
async fn seqwait_timeout() {
let seq = Arc::new(SeqWait::new(0));
let seq = Arc::new(SeqWait::new(0, ()));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
let timeout = Duration::from_millis(1);
@@ -298,10 +420,104 @@ mod tests {
tokio::time::sleep(Duration::from_millis(200)).await;
// This will attempt to wake, but nothing will happen
// because the waiter already dropped its Receiver.
let old = seq.advance(99);
let old = seq.advance(99, None);
assert_eq!(old, 0);
jh.await.unwrap();
seq.shutdown();
}
#[tokio::test]
async fn data_basic() {
let seq = Arc::new(SeqWait::new(0, "a"));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
let data = seq.wait_for(2).await.unwrap();
assert_eq!(data, "b");
});
seq2.advance(1, Some("x"));
seq2.advance(2, Some("b"));
jh.await.unwrap();
}
#[test]
fn data_always_most_recent() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let seq = Arc::new(SeqWait::new(0, "a"));
let seq2 = Arc::clone(&seq);
let jh = rt.spawn(async move {
let data = seq.wait_for(2).await.unwrap();
assert_eq!(data, "d");
});
// jh is not running until we poll it, thanks to current thread runtime
rt.block_on(async move {
seq2.advance(2, Some("b"));
seq2.advance(3, Some("c"));
seq2.advance(4, Some("d"));
});
rt.block_on(jh).unwrap();
}
#[tokio::test]
async fn split_spmc_api_surface() {
let seq = SeqWait::new(0, 1);
let (w, a) = seq.split_spmc();
let _ = w.wait_for(1);
let _ = w.wait_for_timeout(0, Duration::from_secs(10));
let _ = w.clone();
let _ = a.advance(1, None);
let _ = a.wait_for(1);
let _ = a.wait_for_timeout(0, Duration::from_secs(10));
// TODO would be nice to have must-not-compile tests for Advance not being clonable.
}
#[tokio::test]
async fn new_data_same_lsn() {
let seq = Arc::new(SeqWait::new(0, "a"));
seq.advance(1, Some("b"));
let data = seq.wait_for(1).await.unwrap();
assert_eq!(data, "b", "the regular case where lsn and data advance");
seq.advance(1, Some("c"));
let data = seq.wait_for(1).await.unwrap();
assert_eq!(
data, "c",
"no lsn advance still gives new data for old lsn wait_for's"
);
let (start_wait_for_sender, start_wait_for_receiver) = tokio::sync::oneshot::channel();
// ensure we don't wake waiters for data-only change
let jh = tokio::spawn({
let seq = seq.clone();
async move {
start_wait_for_receiver.await.unwrap();
match tokio::time::timeout(Duration::from_secs(2), seq.wait_for(2)).await {
Ok(_) => {
assert!(
false,
"advance should not wake waiters if data changes but LSN doesn't"
);
}
Err(_) => {
// Good, we weren't woken up.
}
}
}
});
seq.advance(1, Some("d"));
start_wait_for_sender.send(()).unwrap();
jh.await.unwrap();
}
}

View File

@@ -13,7 +13,7 @@ use std::time::Instant;
use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
let mut layer_map = LayerMap::<LayerDescriptor>::default();
@@ -114,7 +114,7 @@ fn bench_from_captest_env(c: &mut Criterion) {
c.bench_function("captest_uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
black_box(layer_map.search(q.0, q.1));
layer_map.search(q.0, q.1);
}
});
});
@@ -122,11 +122,11 @@ fn bench_from_captest_env(c: &mut Criterion) {
// test with a key that corresponds to the RelDir entry. See pgdatadir_mapping.rs.
c.bench_function("captest_rel_dir_query", |b| {
b.iter(|| {
let result = black_box(layer_map.search(
let result = layer_map.search(
Key::from_hex("000000067F00008000000000000000000001").unwrap(),
// This LSN is higher than any of the LSNs in the tree
Lsn::from_str("D0/80208AE1").unwrap(),
));
);
result.unwrap();
});
});
@@ -183,7 +183,7 @@ fn bench_from_real_project(c: &mut Criterion) {
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
black_box(layer_map.search(q.0, q.1));
layer_map.search(q.0, q.1);
}
});
});
@@ -232,7 +232,7 @@ fn bench_sequential(c: &mut Criterion) {
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
black_box(layer_map.search(q.0, q.1));
layer_map.search(q.0, q.1);
}
});
});

View File

@@ -6,7 +6,6 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde::de::IntoDeserializer;
use std::env;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
@@ -63,6 +62,7 @@ pub mod defaults {
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
///
/// Default built-in configuration file.
@@ -91,6 +91,7 @@ pub mod defaults {
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
@@ -107,7 +108,6 @@ pub mod defaults {
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
# [remote_storage]
@@ -182,6 +182,9 @@ pub struct PageServerConf {
pub metric_collection_endpoint: Option<Url>,
pub synthetic_size_calculation_interval: Duration,
// See the corresponding metric's help string.
pub evictions_low_residence_duration_metric_threshold: Duration,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
pub test_remote_failures: u64,
@@ -254,6 +257,8 @@ struct PageServerConfigBuilder {
metric_collection_endpoint: BuilderValue<Option<Url>>,
synthetic_size_calculation_interval: BuilderValue<Duration>,
evictions_low_residence_duration_metric_threshold: BuilderValue<Duration>,
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
test_remote_failures: BuilderValue<u64>,
@@ -311,6 +316,11 @@ impl Default for PageServerConfigBuilder {
.expect("cannot parse default synthetic size calculation interval")),
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
evictions_low_residence_duration_metric_threshold: Set(humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")),
disk_usage_based_eviction: Set(None),
test_remote_failures: Set(0),
@@ -428,6 +438,10 @@ impl PageServerConfigBuilder {
self.test_remote_failures = BuilderValue::Set(fail_first);
}
pub fn evictions_low_residence_duration_metric_threshold(&mut self, value: Duration) {
self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value);
}
pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
self.disk_usage_based_eviction = BuilderValue::Set(value);
}
@@ -511,6 +525,11 @@ impl PageServerConfigBuilder {
synthetic_size_calculation_interval: self
.synthetic_size_calculation_interval
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.ok_or(anyhow!(
"missing evictions_low_residence_duration_metric_threshold"
))?,
disk_usage_based_eviction: self
.disk_usage_based_eviction
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
@@ -702,12 +721,12 @@ impl PageServerConf {
"synthetic_size_calculation_interval" =>
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
"evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?),
"disk_usage_based_eviction" => {
tracing::info!("disk_usage_based_eviction: {:#?}", &item);
builder.disk_usage_based_eviction(
deserialize_from_item("disk_usage_based_eviction", item)
.context("parse disk_usage_based_eviction")?
)
toml_edit::de::from_item(item.clone())
.context("parse disk_usage_based_eviction")?)
},
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
@@ -808,25 +827,18 @@ impl PageServerConf {
if let Some(eviction_policy) = item.get("eviction_policy") {
t_conf.eviction_policy = Some(
deserialize_from_item("eviction_policy", eviction_policy)
toml_edit::de::from_item(eviction_policy.clone())
.context("parse eviction_policy")?,
);
}
if let Some(item) = item.get("min_resident_size_override") {
t_conf.min_resident_size_override = Some(
deserialize_from_item("min_resident_size_override", item)
toml_edit::de::from_item(item.clone())
.context("parse min_resident_size_override")?,
);
}
if let Some(item) = item.get("evictions_low_residence_duration_metric_threshold") {
t_conf.evictions_low_residence_duration_metric_threshold = Some(parse_toml_duration(
"evictions_low_residence_duration_metric_threshold",
item,
)?);
}
Ok(t_conf)
}
@@ -865,6 +877,10 @@ impl PageServerConf {
cached_metric_collection_interval: Duration::from_secs(60 * 60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: Duration::from_secs(60),
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.unwrap(),
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -922,18 +938,6 @@ where
})
}
fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned,
{
// ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
let deserializer = match item.clone().into_value() {
Ok(value) => value.into_deserializer(),
Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
};
T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
}
/// Configurable semaphore permits setting.
///
/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
@@ -1000,10 +1004,9 @@ mod tests {
use remote_storage::{RemoteStorageKind, S3Config};
use tempfile::{tempdir, TempDir};
use utils::serde_percent::Percent;
use super::*;
use crate::{tenant::config::EvictionPolicy, DEFAULT_PG_VERSION};
use crate::DEFAULT_PG_VERSION;
const ALL_BASE_VALUES_TOML: &str = r#"
# Initial configuration file created by 'pageserver --init'
@@ -1026,6 +1029,8 @@ cached_metric_collection_interval = '22200 s'
metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s'
evictions_low_residence_duration_metric_threshold = '444 s'
log_format = 'json'
"#;
@@ -1082,6 +1087,9 @@ log_format = 'json'
synthetic_size_calculation_interval: humantime::parse_duration(
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
)?,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD
)?,
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -1136,6 +1144,7 @@ log_format = 'json'
cached_metric_collection_interval: Duration::from_secs(22200),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
synthetic_size_calculation_interval: Duration::from_secs(333),
evictions_low_residence_duration_metric_threshold: Duration::from_secs(444),
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -1301,71 +1310,6 @@ trace_read_requests = {trace_read_requests}"#,
Ok(())
}
#[test]
fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let pageserver_conf_toml = format!(
r#"pg_distrib_dir = "{}"
metric_collection_endpoint = "http://sample.url"
metric_collection_interval = "10min"
id = 222
[disk_usage_based_eviction]
max_usage_pct = 80
min_avail_bytes = 0
period = "10s"
[tenant_config]
evictions_low_residence_duration_metric_threshold = "20m"
[tenant_config.eviction_policy]
kind = "LayerAccessThreshold"
period = "20m"
threshold = "20m"
"#,
pg_distrib_dir.display(),
);
let toml: Document = pageserver_conf_toml.parse()?;
let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
assert_eq!(
conf.metric_collection_endpoint,
Some("http://sample.url".parse().unwrap())
);
assert_eq!(
conf.metric_collection_interval,
Duration::from_secs(10 * 60)
);
assert_eq!(
conf.default_tenant_conf
.evictions_low_residence_duration_metric_threshold,
Duration::from_secs(20 * 60)
);
assert_eq!(conf.id, NodeId(222));
assert_eq!(
conf.disk_usage_based_eviction,
Some(DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 0,
period: Duration::from_secs(10),
#[cfg(feature = "testing")]
mock_statvfs: None,
})
);
match &conf.default_tenant_conf.eviction_policy {
EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
}
}
Ok(())
}
fn prepare_fs(tempdir: &TempDir) -> anyhow::Result<(PathBuf, PathBuf)> {
let tempdir_path = tempdir.path();

View File

@@ -781,19 +781,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let target_tenant_id = request_data
.new_tenant_id
.map(TenantId::from)
@@ -927,19 +914,6 @@ async fn update_tenant_config_handler(
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
.instrument(info_span!("tenant_config", tenant = ?tenant_id))

View File

@@ -257,22 +257,6 @@ impl EvictionsWithLowResidenceDuration {
}
}
pub fn change_threshold(
&mut self,
tenant_id: &str,
timeline_id: &str,
new_threshold: Duration,
) {
if new_threshold == self.threshold {
return;
}
let mut with_new =
EvictionsWithLowResidenceDurationBuilder::new(self.data_source, new_threshold)
.build(tenant_id, timeline_id);
std::mem::swap(self, &mut with_new);
with_new.remove(tenant_id, timeline_id);
}
// This could be a `Drop` impl, but, we need the `tenant_id` and `timeline_id`.
fn remove(&mut self, tenant_id: &str, timeline_id: &str) {
let Some(_counter) = self.counter.take() else {
@@ -605,7 +589,7 @@ pub struct TimelineMetrics {
pub num_persistent_files_created: IntCounter,
pub persistent_bytes_written: IntCounter,
pub evictions: IntCounter,
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
pub evictions_with_low_residence_duration: EvictionsWithLowResidenceDuration,
}
impl TimelineMetrics {
@@ -672,9 +656,7 @@ impl TimelineMetrics {
num_persistent_files_created,
persistent_bytes_written,
evictions,
evictions_with_low_residence_duration: std::sync::RwLock::new(
evictions_with_low_residence_duration,
),
evictions_with_low_residence_duration,
}
}
}
@@ -693,8 +675,6 @@ impl Drop for TimelineMetrics {
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
self.evictions_with_low_residence_duration
.write()
.unwrap()
.remove(tenant_id, timeline_id);
for op in STORAGE_TIME_OPERATIONS {
let _ =

View File

@@ -65,7 +65,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down".to_string();
let msg = format!("pageserver is shutting down");
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}

View File

@@ -1735,13 +1735,6 @@ impl Tenant {
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
*self.tenant_conf.write().unwrap() = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated();
}
}
fn create_timeline_data(
@@ -1894,7 +1887,7 @@ impl Tenant {
.to_string();
// Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
conf_content += &toml_edit::easy::to_string(&tenant_conf)?;
let mut target_config_file = VirtualFile::open_with_options(
target_config_path,
@@ -2822,9 +2815,6 @@ pub mod harness {
trace_read_requests: Some(tenant_conf.trace_read_requests),
eviction_policy: Some(tenant_conf.eviction_policy),
min_resident_size_override: tenant_conf.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: Some(
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
}
}
}

View File

@@ -39,7 +39,6 @@ pub mod defaults {
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}
/// Per-tenant configuration options
@@ -94,9 +93,6 @@ pub struct TenantConf {
pub trace_read_requests: bool,
pub eviction_policy: EvictionPolicy,
pub min_resident_size_override: Option<u64>,
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -168,11 +164,6 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub min_resident_size_override: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -237,9 +228,6 @@ impl TenantConfOpt {
min_resident_size_override: self
.min_resident_size_override
.or(global_conf.min_resident_size_override),
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
}
}
}
@@ -272,10 +260,6 @@ impl Default for TenantConf {
trace_read_requests: false,
eviction_policy: EvictionPolicy::NoEviction,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
}
}
}
@@ -291,9 +275,9 @@ mod tests {
..TenantConfOpt::default()
};
let toml_form = toml_edit::ser::to_string(&small_conf).unwrap();
let toml_form = toml_edit::easy::to_string(&small_conf).unwrap();
assert_eq!(toml_form, "gc_horizon = 42\n");
assert_eq!(small_conf, toml_edit::de::from_str(&toml_form).unwrap());
assert_eq!(small_conf, toml_edit::easy::from_str(&toml_form).unwrap());
let json_form = serde_json::to_string(&small_conf).unwrap();
assert_eq!(json_form, "{\"gc_horizon\":42}");

View File

@@ -74,7 +74,7 @@ pub(super) async fn upload_timeline_layer<'a>(
})?;
storage
.upload(source_file, fs_size, &storage_path, None)
.upload(Box::new(source_file), fs_size, &storage_path, None)
.await
.with_context(|| {
format!(

View File

@@ -77,7 +77,6 @@ pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -146,7 +145,7 @@ pub struct Timeline {
// 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
// first WAL record when the node is started up. But here, we just
// keep track of it.
last_record_lsn: SeqWait<RecordLsn, Lsn>,
last_record_lsn: SeqWait<RecordLsn, Lsn, ()>,
// All WAL records have been processed and stored durably on files on
// local disk, up to this LSN. On crash and restart, we need to re-process
@@ -162,7 +161,7 @@ pub struct Timeline {
ancestor_timeline: Option<Arc<Timeline>>,
ancestor_lsn: Lsn,
pub(super) metrics: TimelineMetrics,
metrics: TimelineMetrics,
/// Ensures layers aren't frozen by checkpointer between
/// [`Timeline::get_layer_for_write`] and layer reads.
@@ -1137,8 +1136,6 @@ impl Timeline {
if let Some(delta) = local_layer_residence_duration {
self.metrics
.evictions_with_low_residence_duration
.read()
.unwrap()
.observe(delta);
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
@@ -1212,35 +1209,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
}
fn get_evictions_low_residence_duration_metric_threshold(
tenant_conf: &TenantConfOpt,
default_tenant_conf: &TenantConf,
) -> Duration {
tenant_conf
.evictions_low_residence_duration_metric_threshold
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
// The threshold is embedded in the metric. So, we need to update it.
{
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
&self.tenant_conf.read().unwrap(),
&self.conf.default_tenant_conf,
);
let tenant_id_str = self.tenant_id.to_string();
let timeline_id_str = self.timeline_id.to_string();
self.metrics
.evictions_with_low_residence_duration
.write()
.unwrap()
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
}
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.
@@ -1272,11 +1240,6 @@ impl Timeline {
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
let evictions_low_residence_duration_metric_threshold =
Self::get_evictions_low_residence_duration_metric_threshold(
&tenant_conf_guard,
&conf.default_tenant_conf,
);
drop(tenant_conf_guard);
Arc::new_cyclic(|myself| {
@@ -1307,10 +1270,13 @@ impl Timeline {
remote_client: remote_client.map(Arc::new),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
}),
last_record_lsn: SeqWait::new(
RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
},
(),
),
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
@@ -1324,7 +1290,7 @@ impl Timeline {
&timeline_id,
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
"mtime",
evictions_low_residence_duration_metric_threshold,
conf.evictions_low_residence_duration_metric_threshold,
),
),
@@ -2457,7 +2423,7 @@ impl Timeline {
assert!(new_lsn.is_aligned());
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
self.last_record_lsn.advance(new_lsn);
self.last_record_lsn.advance(new_lsn, None);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {

View File

@@ -27,8 +27,6 @@ use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task::spawn_blocking;
use tokio::time::Duration;
use tokio::time::Instant;
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
@@ -208,10 +206,6 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
}
}
// Send keepalive messages to walproposer, to make sure it receives updates
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
struct WalAcceptor {
tli: Arc<Timeline>,
@@ -259,25 +253,18 @@ impl WalAcceptor {
timeline: Arc::clone(&self.tli),
};
// After this timestamp we will stop processing AppendRequests and send a response
// to the walproposer. walproposer sends at least one AppendRequest per second,
// we will send keepalives by replying to these requests once per second.
let mut next_keepalive = Instant::now();
let mut next_msg: ProposerAcceptorMessage;
loop {
let opt_msg = self.msg_rx.recv().await;
if opt_msg.is_none() {
return Ok(()); // chan closed, streaming terminated
}
let mut next_msg = opt_msg.unwrap();
next_msg = opt_msg.unwrap();
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
// loop through AppendRequest's while it's readily available to
// write as many WAL as possible without fsyncing
//
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
// Otherwise, we might end up in a situation where we read a message, but don't
// process it.
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
@@ -287,11 +274,6 @@ impl WalAcceptor {
}
}
// get out of this loop if keepalive time is reached
if Instant::now() >= next_keepalive {
break;
}
match self.msg_rx.try_recv() {
Ok(msg) => next_msg = msg,
Err(TryRecvError::Empty) => break,
@@ -300,18 +282,18 @@ impl WalAcceptor {
}
// flush all written WAL to the disk
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?
if let Some(reply) = self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
}
} else {
// process message other than AppendRequest
self.tli.process_msg(&next_msg)?
};
if let Some(reply) = reply_msg {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
if let Some(reply) = self.tli.process_msg(&next_msg)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
}
// reset keepalive time
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
}
}
}

View File

@@ -23,6 +23,7 @@ use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
@@ -373,7 +374,7 @@ impl BrokerService for Broker {
Ok(info) => yield info,
Err(RecvError::Lagged(skipped_msg)) => {
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
missed_msgs = 0;

View File

@@ -1913,26 +1913,15 @@ def remote_pg(
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
host = parse_dsn(connstr).get("host", "")
is_neon = host.endswith(".neon.build")
start_ms = int(datetime.utcnow().timestamp() * 1000)
with RemotePostgres(pg_bin, connstr) as remote_pg:
if is_neon:
timeline_id = TimelineId(remote_pg.safe_psql("SHOW neon.timeline_id")[0][0])
yield remote_pg
end_ms = int(datetime.utcnow().timestamp() * 1000)
if is_neon:
host = parse_dsn(connstr).get("host", "")
if host.endswith(".neon.build"):
# Add 10s margin to the start and end times
allure_add_grafana_links(
host,
timeline_id,
start_ms - 10_000,
end_ms + 10_000,
)
allure_add_grafana_links(host, start_ms - 10_000, end_ms + 10_000)
class PSQL:

View File

@@ -519,13 +519,6 @@ class PageserverHttpClient(requests.Session):
assert res.status_code == 200
def download_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)
for layer in info.historic_layers:
if not layer.remote:
continue
self.download_layer(tenant_id, timeline_id, layer.layer_file_name)
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",

View File

@@ -13,7 +13,6 @@ import allure
from psycopg2.extensions import cursor
from fixtures.log_helper import log
from fixtures.types import TimelineId
Fn = TypeVar("Fn", bound=Callable[..., Any])
@@ -187,15 +186,11 @@ def allure_attach_from_dir(dir: Path):
allure.attach.file(source, name, attachment_type, extension)
GRAFANA_URL = "https://neonprod.grafana.net"
GRAFANA_EXPLORE_URL = f"{GRAFANA_URL}/explore"
GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL = f"{GRAFANA_URL}/d/8G011dlnk/timeline-inspector"
LOGS_STAGING_DATASOURCE_ID = "xHHYY0dVz"
DATASOURCE_ID = "xHHYY0dVz"
def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int, end_ms: int):
def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
"""Add links to server logs in Grafana to Allure report"""
links = {}
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
endpoint_id, region_id, _ = host.split(".", 2)
@@ -207,12 +202,12 @@ def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int,
}
params: Dict[str, Any] = {
"datasource": LOGS_STAGING_DATASOURCE_ID,
"datasource": DATASOURCE_ID,
"queries": [
{
"expr": "<PUT AN EXPRESSION HERE>",
"refId": "A",
"datasource": {"type": "loki", "uid": LOGS_STAGING_DATASOURCE_ID},
"datasource": {"type": "loki", "uid": DATASOURCE_ID},
"editorMode": "code",
"queryType": "range",
}
@@ -225,23 +220,8 @@ def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int,
for name, expr in expressions.items():
params["queries"][0]["expr"] = expr
query_string = urlencode({"orgId": 1, "left": json.dumps(params)})
links[name] = f"{GRAFANA_EXPLORE_URL}?{query_string}"
link = f"https://neonprod.grafana.net/explore?{query_string}"
timeline_qs = urlencode(
{
"orgId": 1,
"var-environment": "victoria-metrics-aws-dev",
"var-timeline_id": timeline_id,
"var-endpoint_id": endpoint_id,
"var-log_datasource": "grafanacloud-neonstaging-logs",
"from": start_ms,
"to": end_ms,
}
)
link = f"{GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL}?{timeline_qs}"
links["Timeline Inspector"] = link
for name, link in links.items():
allure.dynamic.link(link, name=name)
log.info(f"{name}: {link}")

View File

@@ -18,11 +18,7 @@ def test_tenant_config(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = """
page_cache_size=444;
wait_lsn_timeout='111 s';
[tenant_config]
checkpoint_distance = 10000
compaction_target_size = 1048576
evictions_low_residence_duration_metric_threshold = "2 days"
"""
tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
env = neon_env_builder.init_start()
http_client = env.pageserver.http_client()
@@ -43,7 +39,6 @@ evictions_low_residence_duration_metric_threshold = "2 days"
new_conf = {
"checkpoint_distance": "20000",
"gc_period": "30sec",
"evictions_low_residence_duration_metric_threshold": "42s",
}
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
@@ -83,7 +78,6 @@ evictions_low_residence_duration_metric_threshold = "2 days"
assert effective_config["gc_period"] == "1h"
assert effective_config["image_creation_threshold"] == 3
assert effective_config["pitr_interval"] == "7days"
assert effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
# check the configuration of the new tenant
with closing(env.pageserver.connect()) as psconn:
@@ -118,9 +112,6 @@ evictions_low_residence_duration_metric_threshold = "2 days"
assert (
new_effective_config["gc_period"] == "30s"
), "Specific 'gc_period' config should override the default value"
assert (
new_effective_config["evictions_low_residence_duration_metric_threshold"] == "42s"
), "Should override default value"
assert new_effective_config["compaction_target_size"] == 1048576
assert new_effective_config["compaction_period"] == "20s"
assert new_effective_config["compaction_threshold"] == 10
@@ -134,7 +125,6 @@ evictions_low_residence_duration_metric_threshold = "2 days"
"gc_period": "80sec",
"compaction_period": "80sec",
"image_creation_threshold": "2",
"evictions_low_residence_duration_metric_threshold": "23h",
}
env.neon_cli.config_tenant(
tenant_id=tenant,
@@ -177,9 +167,6 @@ evictions_low_residence_duration_metric_threshold = "2 days"
assert (
updated_effective_config["compaction_period"] == "1m 20s"
), "Specific 'compaction_period' config should override the default value"
assert (
updated_effective_config["evictions_low_residence_duration_metric_threshold"] == "23h"
), "Should override default value"
assert updated_effective_config["compaction_target_size"] == 1048576
assert updated_effective_config["compaction_threshold"] == 10
assert updated_effective_config["gc_horizon"] == 67108864
@@ -238,7 +225,6 @@ evictions_low_residence_duration_metric_threshold = "2 days"
assert final_effective_config["gc_horizon"] == 67108864
assert final_effective_config["gc_period"] == "1h"
assert final_effective_config["image_creation_threshold"] == 3
assert final_effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
# restart the pageserver and ensure that the config is still correct
env.pageserver.stop()
@@ -299,81 +285,3 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
# dont test applying the setting here, we have that another test case to show it
# we just care about being able to create the file
assert len(contents_first) > len(contents_later)
def test_live_reconfig_get_evictions_low_residence_duration_metric_threshold(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_live_reconfig_get_evictions_low_residence_duration_metric_threshold",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
ps_http = env.pageserver.http_client()
def get_metric():
metrics = ps_http.get_metrics()
metric = metrics.query_one(
"pageserver_evictions_with_low_residence_duration_total",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
return metric
default_value = ps_http.tenant_config(tenant_id).effective_config[
"evictions_low_residence_duration_metric_threshold"
]
metric = get_metric()
assert int(metric.value) == 0, "metric is present with default value"
assert default_value == "1day"
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.value) > 0, "metric is updated"
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": default_value}
)
updated_metric = get_metric()
assert int(updated_metric.value) == int(
metric.value
), "metric is unchanged when setting same value"
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": "2day"}
)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 24 * 60 * 60
assert int(metric.value) == 0
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 24 * 60 * 60
assert int(metric.value) > 0
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": "2h"}
)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 60 * 60
assert int(metric.value) == 0, "value resets if label changes"
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 60 * 60
assert int(metric.value) > 0, "set a non-zero value for next step"
env.neon_cli.config_tenant(tenant_id, {})
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 24 * 60 * 60, "label resets to default"
assert int(metric.value) == 0, "value resets to default"

View File

@@ -4,6 +4,8 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap.workspace = true
anyhow.workspace = true

View File

@@ -18,7 +18,6 @@ byteorder = { version = "1" }
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
@@ -30,6 +29,7 @@ futures-executor = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -52,8 +52,7 @@ socket2 = { version = "0.4", default-features = false, features = ["all"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "sync", "time"] }
tokio-rustls = { version = "0.23" }
tokio-util = { version = "0.7", features = ["codec", "io"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
toml_edit = { version = "0.19", features = ["serde"] }
tonic = { version = "0.8", features = ["tls-roots"] }
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
@@ -65,6 +64,7 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
either = { version = "1" }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -74,7 +74,6 @@ prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.6" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }
syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
### END HAKARI SECTION