Compare commits

..

8 Commits

Author SHA1 Message Date
Matt Nappo
87644fe036 Added black_box in layer_map benches (fix #3396) 2023-04-14 12:22:43 -04:00
Kirill Bulatov
ebea298415 Update most of the dependencies to their latest versions (#4026)
See https://github.com/neondatabase/neon/pull/3991

Brings the changes back with the right way to use new `toml_edit` to
deserialize values and a unit test for this.

All non-trivial updates extracted into separate commits, also `carho hakari` data and its manifest format were updated.

3 sets of crates remain unupdated:

* `base64` — touches proxy in a lot of places and changed its api (by 0.21 version) quite strongly since our version (0.13).
* `opentelemetry` and `opentelemetry-*` crates

```
error[E0308]: mismatched types
  --> libs/tracing-utils/src/http.rs:65:21
   |
65 |     span.set_parent(parent_ctx);
   |          ---------- ^^^^^^^^^^ expected struct `opentelemetry_api::context::Context`, found struct `opentelemetry::Context`
   |          |
   |          arguments to this method are incorrect
   |
   = note: struct `opentelemetry::Context` and struct `opentelemetry_api::context::Context` have similar names, but are actually distinct types
note: struct `opentelemetry::Context` is defined in crate `opentelemetry_api`
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/opentelemetry_api-0.19.0/src/context.rs:77:1
   |
77 | pub struct Context {
   | ^^^^^^^^^^^^^^^^^^
note: struct `opentelemetry_api::context::Context` is defined in crate `opentelemetry_api`
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/opentelemetry_api-0.18.0/src/context.rs:77:1
   |
77 | pub struct Context {
   | ^^^^^^^^^^^^^^^^^^
   = note: perhaps two different versions of crate `opentelemetry_api` are being used?
note: associated function defined here
  --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-opentelemetry-0.18.0/src/span_ext.rs:43:8
   |
43 |     fn set_parent(&self, cx: Context);
   |        ^^^^^^^^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `tracing-utils` due to previous error
warning: build failed, waiting for other jobs to finish...
error: could not compile `tracing-utils` due to previous error
```

`tracing-opentelemetry` of version `0.19` is not yet released, that is supposed to have the update we need.

* similarly, `rustls`, `tokio-rustls`, `rustls-*` and `tls-listener` crates have similar issue:

```
error[E0308]: mismatched types
   --> libs/postgres_backend/tests/simple_select.rs:112:78
    |
112 |     let mut make_tls_connect = tokio_postgres_rustls::MakeRustlsConnect::new(client_cfg);
    |                                --------------------------------------------- ^^^^^^^^^^ expected struct `rustls::client::client_conn::ClientConfig`, found struct `ClientConfig`
    |                                |
    |                                arguments to this function are incorrect
    |
    = note: struct `ClientConfig` and struct `rustls::client::client_conn::ClientConfig` have similar names, but are actually distinct types
note: struct `ClientConfig` is defined in crate `rustls`
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/rustls-0.21.0/src/client/client_conn.rs:125:1
    |
125 | pub struct ClientConfig {
    | ^^^^^^^^^^^^^^^^^^^^^^^
note: struct `rustls::client::client_conn::ClientConfig` is defined in crate `rustls`
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/rustls-0.20.8/src/client/client_conn.rs:91:1
    |
91  | pub struct ClientConfig {
    | ^^^^^^^^^^^^^^^^^^^^^^^
    = note: perhaps two different versions of crate `rustls` are being used?
note: associated function defined here
   --> /Users/someonetoignore/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-postgres-rustls-0.9.0/src/lib.rs:23:12
    |
23  |     pub fn new(config: ClientConfig) -> Self {
    |            ^^^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `postgres_backend` due to previous error
warning: build failed, waiting for other jobs to finish...
```

* aws crates: I could not make new API to work with bucket endpoint overload, and console e2e tests failed.
Other our tests passed, further investigation is worth to be done in https://github.com/neondatabase/neon/issues/4008
2023-04-14 18:28:54 +03:00
Vadim Kharitonov
5ffa20dd82 [proxy] adjust proxy sleep timeout 2023-04-14 15:08:07 +03:00
Vadim Kharitonov
75ea8106ec Add procps into compute containers 2023-04-14 15:02:26 +03:00
Vadim Kharitonov
017d3a390d Compile postgres with lz4 and zstd support 2023-04-14 15:02:26 +03:00
Alexey Kondratov
589cf1ed21 [compute_ctl] Do not create availability checker data on each start (#4019)
Initially, idea was to ensure that when we come and check data
availability, special service table already contains one row. So if we
loose it for some reason, we will error out.

Yet, to do availability check we anyway start compute first! So it
doesn't really add some value, but we affect each compute start as we
update at least one row in the database. Also this writes some WAL, so
if timeline is close to `neon.max_cluster_size` it could prevent compute
from starting up.

That said, do CREATE TABLE IF NOT EXISTS + UPSERT right in the
`/check_writability` handler.
2023-04-14 13:05:07 +02:00
Alexander Bayandin
0c82ff3d98 test_runner: add Timeline Inspector to Grafana links (#4021) 2023-04-14 11:46:47 +01:00
Christian Schwarz
8895f28dae make evictions_low_residence_duration_metric_threshold per-tenant (#3949)
Before this patch, if a tenant would override its eviction_policy
setting to use a lower LayerAccessThreshold::threshold than the
`evictions_low_residence_duration_metric_threshold`, the evictions done
for that tenant would count towards the
`evictions_with_low_residence_duration` metric.

That metric is used to identify pre-mature evictions, commonly triggered
by disk-usage-based eviction under disk pressure.

We don't want that to happen for the legitimate evictions of the tenant
that overrides its eviction_policy.

So, this patch
- moves the setting into TenantConf
- adds test coverage
- updates the staging & prod yamls

Forward Compatibility:
Software before this patch will ignore the new tenant conf field and use
the global one instead.
So we can roll back safely.

Backward Compatibility:
Parsing old configs with software as of this patch will fail in
`PageServerConf::parse_and_validate` with error 
`unrecognized pageserver option 'evictions_low_residence_duration_metric_threshold'`
if the option is still present in the global section.
We deal with this by updating the configs in Ansible.

fixes https://github.com/neondatabase/neon/issues/3940
2023-04-14 13:25:45 +03:00
48 changed files with 1429 additions and 1524 deletions

View File

@@ -4,7 +4,7 @@
hakari-package = "workspace_hack"
# Format for `workspace-hack = ...` lines in other Cargo.tomls. Requires cargo-hakari 0.9.8 or above.
dep-format-version = "3"
dep-format-version = "4"
# Setting workspace.resolver = "2" in the root Cargo.toml is HIGHLY recommended.
# Hakari works much better with the new feature resolver.

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
@@ -34,7 +34,7 @@ storage:
pageservers:
hosts:
pageserver-0.us-west-2.aws.neon.tech:
ansible_host: i-0d9f6dfae0e1c780d
ansible_host: i-0d9f6dfae0e1c780d
pageserver-1.us-west-2.aws.neon.tech:
ansible_host: i-0c834be1dddba8b3f
pageserver-2.us-west-2.aws.neon.tech:
@@ -49,5 +49,5 @@ storage:
safekeeper-1.us-west-2.aws.neon.tech:
ansible_host: i-074682f9d3c712e7c
safekeeper-2.us-west-2.aws.neon.tech:
ansible_host: i-042b7efb1729d7966
ansible_host: i-042b7efb1729d7966

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -17,7 +17,7 @@ storage:
kind: "LayerAccessThreshold"
period: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -7,15 +7,16 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

View File

@@ -7,13 +7,13 @@ deploymentStrategy:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# Delay the kill signal by 5 minutes (5 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
command: ["/bin/sh", "-c", "sleep 300"]
terminationGracePeriodSeconds: 604800

1417
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,10 +24,10 @@ atty = "0.2.14"
aws-config = { version = "0.51.0", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.21.0"
aws-smithy-http = "0.51.0"
aws-types = "0.51.0"
aws-types = "0.55"
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.61"
bindgen = "0.65"
bstr = "1.0"
byteorder = "1.4"
bytes = "1.0"
@@ -50,7 +50,7 @@ git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hex = "0.4"
hex-literal = "0.3"
hex-literal = "0.4"
hmac = "0.12.1"
hostname = "0.3.1"
humantime = "2.1"
@@ -80,18 +80,18 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
routerify = "3"
rpds = "0.12.0"
rpds = "0.13"
rustls = "0.20"
rustls-pemfile = "1"
rustls-split = "0.3"
scopeguard = "1.1"
sentry = { version = "0.29", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
sentry = { version = "0.30", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
sha2 = "0.10.2"
signal-hook = "0.3"
socket2 = "0.4.4"
socket2 = "0.5"
strum = "0.24"
strum_macros = "0.24"
svg_fmt = "0.4.1"
@@ -106,17 +106,17 @@ tokio-postgres-rustls = "0.9.0"
tokio-rustls = "0.23"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"] }
toml = "0.5"
toml_edit = { version = "0.17", features = ["easy"] }
tonic = {version = "0.8", features = ["tls", "tls-roots"]}
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2"
uuid = { version = "1.2", features = ["v4", "serde"] }
walkdir = "2.3.2"
webpki-roots = "0.22.5"
x509-parser = "0.14"
webpki-roots = "0.23"
x509-parser = "0.15"
## TODO replace this with tracing
env_logger = "0.10"
@@ -154,9 +154,9 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
## Build dependencies
criterion = "0.4"
rcgen = "0.10"
rstest = "0.16"
rstest = "0.17"
tempfile = "3.4"
tonic-build = "0.8"
tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.

View File

@@ -12,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
libicu-dev libxslt1-dev
libicu-dev libxslt1-dev liblz4-dev libzstd-dev
#########################################################################################
#
@@ -24,8 +24,13 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION} postgres
RUN cd postgres && \
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
--with-libxml --with-libxslt && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION}" != "v14" ]; then \
# zstd is available only from PG15
export CONFIGURE_CMD="${CONFIGURE_CMD} --with-zstd"; \
fi && \
eval $CONFIGURE_CMD && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
@@ -565,13 +570,17 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)
# liblz4-1 for lz4
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
# libzstd1 for zstd
RUN apt update && \
apt install --no-install-recommends -y \
gdb \
locales \
libicu67 \
liblz4-1 \
libreadline8 \
libossp-uuid16 \
libgeos-c1v5 \
@@ -581,7 +590,8 @@ RUN apt update && \
libsfcgal1 \
libxml2 \
libxslt1.1 \
gdb && \
libzstd1 \
procps && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -1,12 +1,28 @@
use anyhow::{anyhow, Result};
use postgres::Client;
use tokio_postgres::NoTls;
use tracing::{error, instrument};
use crate::compute::ComputeNode;
/// Update timestamp in a row in a special service table to check
/// that we can actually write some data in this particular timeline.
/// Create table if it's missing.
#[instrument(skip_all)]
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
// Connect to the database.
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
let query = "
CREATE TABLE IF NOT EXISTS health_check (
id serial primary key,
@@ -15,31 +31,15 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = now();";
let result = client.simple_query(query)?;
if result.len() < 2 {
return Err(anyhow::format_err!("executed {} queries", result.len()));
}
Ok(())
}
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
let result = client
.simple_query("UPDATE health_check SET updated_at = now() WHERE id = 1;")
.await?;
if result.len() != 1 {
return Err(anyhow!("statement can't be executed"));
let result = client.simple_query(query).await?;
if result.len() != 2 {
return Err(anyhow::format_err!(
"expected 2 query results, but got {}",
result.len()
));
}
Ok(())
}

View File

@@ -32,7 +32,6 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
use crate::checker::create_writability_check_data;
use crate::config;
use crate::pg_helpers::*;
use crate::spec::*;
@@ -342,7 +341,6 @@ impl ComputeNode {
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(spec, &mut client)?;
// 'Close' connection

View File

@@ -85,7 +85,10 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
let res = crate::checker::check_writability(compute).await;
match res {
Ok(_) => Response::new(Body::from("true")),
Err(e) => Response::new(Body::from(e.to_string())),
Err(e) => {
error!("check_writability failed: {}", e);
Response::new(Body::from(e.to_string()))
}
}
}

View File

@@ -368,6 +368,9 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -445,6 +448,9 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as an integer")?,
evictions_low_residence_duration_metric_threshold: settings
.get("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
})
.send()?
.error_from_body()?;

View File

@@ -4,13 +4,12 @@ version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.68"
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
rand = "0.8.3"
serde = "1.0.152"
serde_with = "2.1.0"
utils = { version = "0.1.0", path = "../utils" }
workspace_hack = { version = "0.1.0", path = "../../workspace_hack" }
anyhow.workspace = true
chrono.workspace = true
rand.workspace = true
serde.workspace = true
serde_with.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -135,6 +135,7 @@ pub struct TenantCreateRequest {
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
#[serde_as]
@@ -181,6 +182,7 @@ pub struct TenantConfigRequest {
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
impl TenantConfigRequest {
@@ -202,6 +204,7 @@ impl TenantConfigRequest {
trace_read_requests: None,
eviction_policy: None,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
}
}
}

View File

@@ -5,7 +5,7 @@ use std::path::PathBuf;
use std::process::Command;
use anyhow::{anyhow, Context};
use bindgen::callbacks::ParseCallbacks;
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
#[derive(Debug)]
struct PostgresFfiCallbacks;
@@ -20,7 +20,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
// Add any custom #[derive] attributes to the data structures that bindgen
// creates.
fn add_derives(&self, name: &str) -> Vec<String> {
fn add_derives(&self, derive_info: &DeriveInfo) -> Vec<String> {
// This is the list of data structures that we want to serialize/deserialize.
let serde_list = [
"XLogRecord",
@@ -31,7 +31,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
"ControlFileData",
];
if serde_list.contains(&name) {
if serde_list.contains(&derive_info.name) {
vec![
"Default".into(), // Default allows us to easily fill the padding fields with 0.
"Serialize".into(),

View File

@@ -204,12 +204,7 @@ async fn upload_s3_data(
let data = format!("remote blob data {i}").into_bytes();
let data_len = data.len();
task_client
.upload(
Box::new(std::io::Cursor::new(data)),
data_len,
&blob_path,
None,
)
.upload(std::io::Cursor::new(data), data_len, &blob_path, None)
.await?;
Ok::<_, anyhow::Error>((blob_prefix, blob_path))

View File

@@ -1,13 +0,0 @@
[package]
name = "timeline_data_path"
version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
utils.workspace = true
workspace_hack.workspace = true
tokio.workspace = true
thiserror.workspace = true

View File

@@ -1,396 +0,0 @@
//! The Timeline's core data path.
//!
//! # Overview
//!
//! This crate implements the core data path of a Timeline inside Pageserver:
//!
//! 1. WAL records from `walreceiver`, via in-memory layers, into persistent L0 layers.
//! 1. `GetPage@LSN`: retrieval of WAL records and page images for feeding into WAL redo.
//! 1. Data re-shuffeling through compaction (TODO).
//! 1. Page image creation & garbage collection through GC (TODO).
//!
//! This crate assumes the following concepts, but is fully generic over their implementation:
//!
//! - **Delta Records**: data is written into the system in the form of self-descriptive deltas.
//! For the Pageserver use case, these deltas are derived from Postgres WAL records.
//! - **Page Numbers**: Delta Records always affect a single key.
//! That key is called page number, because, in the Pageserver use case, the Postgres table page numbers are the keys.
//! - **LSN**: When writing Delta Records into the system, they are associated with a monotonically increasing LSN.
//! Subsequently written Delta Records must have increasing LSNs.
//! - **Page Images**: Delta Records for a given page can be used to reconstruct the page. Think of it like squashing diffs.
//! - When sorting the Delta Records for a given key by their LSN, any prefix of that sorting can be squashed into a page image.
//! - Delta Records following such a squash can be squashed into that page image.
//! - In Pageserver, WAL redo implements the (pure) function of squashing.
//! - **In-Memory Layer**: an object that represents an "unfinished" L0 layer file, holding Delta Records in insertion order.
//! "Unfinished" means that we're still writing Delta Records to that file.
//! - **Historic Layer**: an object that represents a "finished" layer file, at any compaction level.
//! Such objects reside on disk and/or in remote storage.
//! They may contain Delta Records, Page Images, or a mixture thereof. It doesn't matter.
//! - **HistoricStuff**: an efficient lookup data structure to find the list of Historic Layer objects
//! that hold the Delta Records / PageImages required to reconstruct a Page Image at a given LSN.
//!
//! # API
//!
//! The core idea is that of a specialized single-producer multi-consumer structure,
//! embodied by a Read-end and a Write-end.
//!
//! The Write-end is used to push new `DeltaRecord @ LSN`s into the system.
//! In Pageserver, this is used by the `WalReceiver`.
//!
//! The Read-end provides the `GetPage@LSN` API.
//! In the current iteration, we actually return something called `ReconstructWork`.
//! I.e., we leave the work of reading the values from the layers, and the WAL redo invocation to the caller.
//! Find rationale for this design in the *Scope* section.
//!
//! ## Immutability
//!
//! The traits defined by this crate assume immutable data structures that are multi-versioned.
//!
//! As an example for what "immutable" means, take the case where we add a new Historic Layer to HistoricStuff.
//! Traditionally, one would use shared mutable state, i.e. `Arc<RwLock<...>>`.
//! To insert the new Historic Layer, we would acquire the RwLock in write mode and modify a lookup data structure to accomodate the new layer.
//! The Read-ends would use RwLock in read mode to read from the data structure.
//!
//! Conversely, with *immutable data structures*, writers create new version (aka *snapshots*) of the lookup data structure.
//! New reads on the Read-ends will use the new snapshot, but old ongoing reads would use the old version(s).
//! An efficient implementation would likely share the Historic Layer objects, e.g., using `Arc`.
//! And maybe there's internally mutable state inside the layer objects, e.g., to track residence (i.e., *on-demand downloaded* vs *evicted*).
//! But the important point is that there's no synchronization / lock-holding at any higher level, except when grabbing a reference to the snapshot (Read-end), or when publishing a new snapshot (Write-end).
//!
//! ## Scope
//!
//! The following concerns are considered implementation details from the perspective of this crate:
//!
//! - **Layer File Persistence**: `HistoricStuff::make_historic` is responsible for this.
//! - **Reading Layer Files**: the `ReconstructWork` that the Read-end returns from `GetPage@LSN` requests contains the list of layers to consult.
//! The crate consumer is responsible for reading the layers & doing WAL redo.
//! Likely the implementation of `HistoricStuff` plays a role here, because it is responsible for persisting the layer files.
//! - **Layer Eviction & On-Demand Download**: this is just an aspect of the above.
//! The crate consumer can choose to implement eviction & on-demand download however they wish.
//! The only requirement is that the Historic Layers don't change their contents, i.e., they always returnt he same reconstruct values for the same lookup.
//! - For example, a `LayerCache` modoule or service could take care of layer uploads, eviction, and on-demand downloads.
//! Initially, the `layer cache` can be local-only.
//! But in the future, it can be multi-machine / clustered pagesevers / aka "sharding".
//!
//! # Example
//!
//! The [`new`] function is the entrypoint to this crate.
//!
//! See the test cases for how it is used.
use std::{marker::PhantomData, time::Duration};
use utils::seqwait::{self, Advance, SeqWait, Wait};
#[cfg(test)]
mod tests;
/// Collection of types / type bounds used by Read-end and Write-end.
///
/// See the [`crate`]-level docs's *Concepts* section to learn about
/// the meaning of each associated `type`.
///
/// # Usage
///
/// Define a zero-sized-type and impl this Trait for it.
/// Then use that zero-sized-type as the single generic argument to [`new`]
/// and almost all types declared in this crate.
///
/// It might feel a bit weird, but, the alternative is to have umpteen generic
/// types per `impl` with repetitive trait bounds.
///
/// Search the test cases for an example of how this can be used to improve testability.
pub trait Types {
type Key: Copy;
type Lsn: Ord + Copy;
type LsnCounter: seqwait::MonotonicCounter<Self::Lsn> + Copy;
type DeltaRecord;
type HistoricLayer;
type InMemoryLayer: InMemoryLayer<Types = Self> + Clone;
type HistoricStuff: HistoricStuff<Types = Self> + Clone;
type GetReconstructPathError: std::error::Error;
}
/// Error returned by [`InMemoryLayer::put`].
#[derive(thiserror::Error)]
pub struct InMemoryLayerPutError<DeltaRecord> {
delta: DeltaRecord,
kind: InMemoryLayerPutErrorKind,
}
/// Part of [`InMemoryLayerPutError`].
#[derive(Debug)]
pub enum InMemoryLayerPutErrorKind {
LayerFull,
AlreadyHaveRecordForKeyAndLsn,
}
impl<DeltaRecord> std::fmt::Debug for InMemoryLayerPutError<DeltaRecord> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerPutError")
// would require DeltaRecord to impl Debug
// .field("delta", &self.delta)
.field("kind", &self.kind)
.finish()
}
}
/// An in-memory layer. See [`crate`] docs for details on this concept.
pub trait InMemoryLayer: std::fmt::Debug + Default + Clone {
type Types: Types;
fn put(
&mut self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
delta: <Self::Types as Types>::DeltaRecord,
) -> Result<Self, InMemoryLayerPutError<<Self::Types as Types>::DeltaRecord>>;
fn get(
&self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
) -> Vec<<Self::Types as Types>::DeltaRecord>;
}
/// The manager of [`Types::HistoricLayer`]s.
pub trait HistoricStuff {
type Types: Types;
fn get_reconstruct_path(
&self,
key: <Self::Types as Types>::Key,
lsn: <Self::Types as Types>::Lsn,
) -> Result<
Vec<<Self::Types as Types>::HistoricLayer>,
<Self::Types as Types>::GetReconstructPathError,
>;
/// Produce a new version of `self` that includes the given inmem layer.
fn make_historic(&self, inmem: <Self::Types as Types>::InMemoryLayer) -> Self;
}
/// A snapshot of the data. See [`crate`]-level docs section on *immutability* for details.
struct Snapshot<T: Types> {
_types: PhantomData<T>,
inmem: Option<T::InMemoryLayer>,
historic: T::HistoricStuff,
}
impl<T: Types> Clone for Snapshot<T> {
fn clone(&self) -> Self {
Self {
_types: self._types.clone(),
inmem: self.inmem.clone(),
historic: self.historic.clone(),
}
}
}
/// The Read-end. See [`crate`]-level docs for details.
pub struct Reader<T: Types> {
wait: Wait<T::LsnCounter, T::Lsn, Snapshot<T>>,
}
/// The Write-end. See [`crate`]-level docs for details.
pub struct Writer<T: Types> {
advance: Advance<T::LsnCounter, T::Lsn, Snapshot<T>>,
}
/// Setup a pair of Read-end and Write-End. This is the entrypoint to this crate.
///
/// The idea is that the caller loads the arguments from persistent state that `HistoricStuff` wrote at an earlier point in time.
pub fn new<T: Types>(lsn: T::LsnCounter, historic: T::HistoricStuff) -> (Reader<T>, Writer<T>) {
let state = Snapshot {
_types: PhantomData::<T>::default(),
inmem: None,
historic: historic,
};
let (wait, advance) = SeqWait::new(lsn, state).split_spmc();
let reader = Reader { wait };
let read_writer = Writer { advance };
(reader, read_writer)
}
/// Error returned by the get-page operations.
#[derive(Debug, thiserror::Error)]
pub enum GetError<T: Types> {
#[error(transparent)]
SeqWait(seqwait::SeqWaitError),
#[error(transparent)]
GetReconstructPath(T::GetReconstructPathError),
}
/// Self-contained set of objects required to reconstruct a page image for the given `key` @ `lsn`.
///
/// This is returned by the `get` methods of [`Reader`] and [`Writer`].
///
/// To reconstruct the page image, stack up (top to bottom) `inmem_records` plus all records found for `key` and `lsn` along the `historic_path` until an initial page image is found.
/// Then feed that stack to WAL-redo to get the page image.
///
/// See [`crate`]-level docs on *scope* for why we don't return page images from these functions.
pub struct ReconstructWork<T: Types> {
pub key: T::Key,
pub lsn: T::Lsn,
pub inmem_records: Vec<T::DeltaRecord>,
pub historic_path: Vec<T::HistoricLayer>,
}
impl<T: Types> Reader<T> {
/// This is the `GetPage@LSN` operation.
///
/// See the [`crate`]-level docs for why we return [`ReconstructWork`] instead of a Page Image here.
pub async fn get(&self, key: T::Key, lsn: T::Lsn) -> Result<ReconstructWork<T>, GetError<T>> {
// XXX dedup with Writer::get_nowait
let state = self.wait.wait_for(lsn).await.map_err(GetError::SeqWait)?;
let inmem_records = state
.inmem
.as_ref()
.map(|iml| iml.get(key, lsn))
.unwrap_or_default();
let historic_path = state
.historic
.get_reconstruct_path(key, lsn)
.map_err(GetError::GetReconstructPath)?;
Ok(ReconstructWork {
key,
lsn,
inmem_records,
historic_path,
})
}
}
/// Error returned by the `put` operation.
#[derive(thiserror::Error)]
pub struct PutError<T: Types> {
/// The `delta` record which we failed to `put`.
pub delta: T::DeltaRecord,
/// Description of what went wrong.
pub kind: PutErrorKind,
}
/// Part of [`PutError`].
#[derive(Debug)]
pub enum PutErrorKind {
AlreadyHaveInMemoryRecordForKeyAndLsn,
}
impl<T: Types> std::fmt::Debug for PutError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PutError")
// would need to require Debug for DeltaRecord
// .field("delta", &self.delta)
.field("kind", &self.kind)
.finish()
}
}
impl<T: Types> Writer<T> {
/// Insert data into the system.
pub async fn put(
&mut self,
key: T::Key,
lsn: T::Lsn,
delta: T::DeltaRecord,
) -> Result<(), PutError<T>> {
let (_snapshot_lsn, snapshot) = self.advance.get_current_data();
// TODO ensure snapshot_lsn <= lsn?
let mut inmem = snapshot
.inmem
.unwrap_or_else(|| T::InMemoryLayer::default());
// XXX: use the Advance as witness and only allow witness to access inmem in write mode
match inmem.put(key, lsn, delta) {
Ok(new_inmem) => {
let new_snapshot = Snapshot {
_types: PhantomData,
inmem: Some(new_inmem),
historic: snapshot.historic,
};
self.advance.advance(lsn, Some(new_snapshot));
}
Err(InMemoryLayerPutError {
delta,
kind: InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
}) => {
return Err(PutError {
delta,
kind: PutErrorKind::AlreadyHaveInMemoryRecordForKeyAndLsn,
});
}
Err(InMemoryLayerPutError {
delta,
kind: InMemoryLayerPutErrorKind::LayerFull,
}) => {
let new_historic = snapshot.historic.make_historic(inmem);
let mut new_inmem = T::InMemoryLayer::default();
let new_inmem = new_inmem
.put(key, lsn, delta)
.expect("put into default inmem layer must not fail");
let new_state = Snapshot {
_types: PhantomData::<T>::default(),
inmem: Some(new_inmem),
historic: new_historic,
};
self.advance.advance(lsn, Some(new_state));
}
}
Ok(())
}
/// Force flushing of the current in-memory layer.
///
/// Usually, flushing happens only if the in-memory layer is full.
/// Use this API to make it happen in other circumstances (shutdown, periodic ticker, etc.).
pub async fn force_flush(&mut self) -> tokio::io::Result<()> {
let (snapshot_lsn, snapshot) = self.advance.get_current_data();
let Snapshot {
_types,
inmem,
historic,
} = snapshot;
// XXX: use the Advance as witness and only allow witness to access inmem in "write" mode
let Some(inmem) = inmem else {
// nothing to do
return Ok(());
};
let new_historic = historic.make_historic(inmem);
let new_snapshot = Snapshot {
_types: PhantomData::<T>::default(),
inmem: None,
historic: new_historic,
};
self.advance.advance(snapshot_lsn, Some(new_snapshot)); // TODO: should fail if we're past snapshot_lsn
Ok(())
}
/// `get` at the given LSN, without blocking.
///
/// Fails with a timeout error if the `lsn` isn't there yet.
/// That makes sense because the only way we'd stop waiting is by a `self.put()`.
/// But concurrent `put()` is forbidden.
pub async fn get_nowait(
&self,
key: T::Key,
lsn: T::Lsn,
) -> Result<ReconstructWork<T>, GetError<T>> {
// XXX dedup with Reader::get
let state = self
.advance
.wait_for_timeout(lsn, Duration::from_secs(0))
// The await is never going to block because we pass from_secs(0).
.await
.map_err(GetError::SeqWait)?;
let inmem_records = state
.inmem
.as_ref()
.map(|iml| iml.get(key, lsn))
.unwrap_or_default();
let historic_path = state
.historic
.get_reconstruct_path(key, lsn)
.map_err(GetError::GetReconstructPath)?;
Ok(ReconstructWork {
key,
lsn,
inmem_records,
historic_path,
})
}
}

View File

@@ -1,170 +0,0 @@
use std::collections::{btree_map::Entry, BTreeMap};
use std::sync::Arc;
use utils::seqwait;
/// The ZST for which we impl the `super::Types` type collection trait.
struct TestTypes;
impl super::Types for TestTypes {
type Key = usize;
type Lsn = usize;
type LsnCounter = UsizeCounter;
type DeltaRecord = &'static str;
type HistoricLayer = Arc<TestHistoricLayer>;
type InMemoryLayer = TestInMemoryLayer;
type HistoricStuff = TestHistoricStuff;
}
/// For testing, our in-memory layer is a simple hashmap.
#[derive(Clone, Default, Debug)]
struct TestInMemoryLayer {
by_key: BTreeMap<usize, BTreeMap<usize, &'static str>>,
}
/// For testing, our historic layers are just in-memory layer objects with `frozen==true`.
struct TestHistoricLayer(TestInMemoryLayer);
/// This is the data structure that impls the `HistoricStuff` trait.
#[derive(Default, Clone)]
struct TestHistoricStuff {
by_key: BTreeMap<usize, BTreeMap<usize, Arc<TestHistoricLayer>>>,
}
/// `seqwait::MonotonicCounter` impl
#[derive(Copy, Clone)]
pub struct UsizeCounter(usize);
// Our testing impl of HistoricStuff references the frozen InMemoryLayer objects
// from all the (key,lsn) entries that it covers.
// This mimics the (much more efficient) search tree in the real impl.
impl super::HistoricStuff for TestHistoricStuff {
type Types = TestTypes;
fn get_reconstruct_path(
&self,
key: usize,
lsn: usize,
) -> Result<Vec<Arc<TestHistoricLayer>>, super::GetReconstructPathError> {
let Some(bk) = self.by_key.get(&key) else {
return Ok(vec![]);
};
Ok(bk.range(..=lsn).rev().map(|(_, l)| Arc::clone(l)).collect())
}
fn make_historic(&self, inmem: TestInMemoryLayer) -> Self {
// For the purposes of testing, just turn the inmemory layer historic through the type system
let historic = Arc::new(TestHistoricLayer(inmem));
// Deep-copy
let mut copy = self.by_key.clone();
// Add the references to `inmem` to the deep-copied struct
for (k, v) in historic.0.by_key.iter() {
for (lsn, _deltas) in v.into_iter() {
let by_key = copy.entry(*k).or_default();
let overwritten = by_key.insert(*lsn, historic.clone());
assert!(matches!(overwritten, None), "layers must not overlap");
}
}
Self { by_key: copy }
}
}
impl super::InMemoryLayer for TestInMemoryLayer {
type Types = TestTypes;
fn put(
&mut self,
key: usize,
lsn: usize,
delta: &'static str,
) -> Result<Self, super::InMemoryLayerPutError<&'static str>> {
let mut clone = self.clone();
drop(self);
let by_key = clone.by_key.entry(key).or_default();
match by_key.entry(lsn) {
Entry::Occupied(_record) => {
return Err(super::InMemoryLayerPutError {
delta,
kind: super::InMemoryLayerPutErrorKind::AlreadyHaveRecordForKeyAndLsn,
});
}
Entry::Vacant(vacant) => vacant.insert(delta),
};
Ok(clone)
}
fn get(&self, key: usize, lsn: usize) -> Vec<&'static str> {
let by_key = match self.by_key.get(&key) {
Some(by_key) => by_key,
None => return vec![],
};
by_key
.range(..=lsn)
.map(|(_, v)| v)
.rev()
.cloned()
.collect()
}
}
impl UsizeCounter {
pub fn new(inital: usize) -> Self {
UsizeCounter(inital)
}
}
impl seqwait::MonotonicCounter<usize> for UsizeCounter {
fn cnt_advance(&mut self, new_val: usize) {
assert!(self.0 < new_val);
self.0 = new_val;
}
fn cnt_value(&self) -> usize {
self.0
}
}
#[test]
fn basic() {
let lm = TestHistoricStuff::default();
let (r, mut rw) = super::new::<TestTypes>(UsizeCounter::new(0), lm);
let r = Arc::new(r);
let r2 = Arc::clone(&r);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let read_jh = rt.spawn(async move { r.get(0, 10).await });
let mut rw = rt.block_on(async move {
rw.put(0, 1, "foo").await.unwrap();
rw.put(1, 1, "bar").await.unwrap();
rw.put(0, 10, "baz").await.unwrap();
rw
});
let read_res = rt.block_on(read_jh).unwrap().unwrap();
assert!(
read_res.historic_path.is_empty(),
"we have pushed less than needed for flush"
);
assert_eq!(read_res.inmem_records, vec!["baz", "foo"]);
let rw = rt.block_on(async move {
rw.put(0, 11, "blup").await.unwrap();
rw
});
let read_res = rt.block_on(async move { r2.get(0, 11).await.unwrap() });
assert_eq!(read_res.historic_path.len(), 0);
assert_eq!(read_res.inmem_records, vec!["blup", "baz", "foo"]);
drop(rw);
}

View File

@@ -14,4 +14,5 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
workspace_hack.workspace = true

View File

@@ -33,11 +33,10 @@ serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
url.workspace = true
uuid = { version = "1.2", features = ["v4", "serde"] }
uuid.workspace = true
metrics.workspace = true
workspace_hack.workspace = true
either.workspace = true
[dev-dependencies]
byteorder.workspace = true

View File

@@ -1,13 +1,12 @@
#![warn(missing_docs)]
use either::Either;
use std::cmp::{Eq, Ordering, PartialOrd};
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::mem;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::time::timeout;
/// An error happened while waiting for a number
@@ -37,48 +36,45 @@ pub trait MonotonicCounter<V> {
}
/// Internal components of a `SeqWait`
struct SeqWaitInt<S, V, T>
struct SeqWaitInt<S, V>
where
S: MonotonicCounter<V>,
V: Ord,
T: Clone,
{
waiters: BinaryHeap<Waiter<V, T>>,
waiters: BinaryHeap<Waiter<V>>,
current: S,
shutdown: bool,
data: T,
}
struct Waiter<V, T>
struct Waiter<T>
where
V: Ord,
T: Clone,
T: Ord,
{
wake_num: V, // wake me when this number arrives ...
wake_channel: Sender<T>, // ... by sending a message to this channel
wake_num: T, // wake me when this number arrives ...
wake_channel: Sender<()>, // ... by sending a message to this channel
}
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that.
impl<V: Ord, T: Clone> PartialOrd for Waiter<V, T> {
impl<T: Ord> PartialOrd for Waiter<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.wake_num.partial_cmp(&self.wake_num)
}
}
impl<V: Ord, T: Clone> Ord for Waiter<V, T> {
impl<T: Ord> Ord for Waiter<T> {
fn cmp(&self, other: &Self) -> Ordering {
other.wake_num.cmp(&self.wake_num)
}
}
impl<V: Ord, T: Clone> PartialEq for Waiter<V, T> {
impl<T: Ord> PartialEq for Waiter<T> {
fn eq(&self, other: &Self) -> bool {
other.wake_num == self.wake_num
}
}
impl<V: Ord, T: Clone> Eq for Waiter<V, T> {}
impl<T: Ord> Eq for Waiter<T> {}
/// A tool for waiting on a sequence number
///
@@ -96,28 +92,25 @@ impl<V: Ord, T: Clone> Eq for Waiter<V, T> {}
///
/// <S> means Storage, <V> is type of counter that this storage exposes.
///
pub struct SeqWait<S, V, T>
pub struct SeqWait<S, V>
where
S: MonotonicCounter<V>,
V: Ord,
T: Clone,
{
internal: Mutex<SeqWaitInt<S, V, T>>,
internal: Mutex<SeqWaitInt<S, V>>,
}
impl<S, V, T> SeqWait<S, V, T>
impl<S, V> SeqWait<S, V>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// Create a new `SeqWait`, initialized to a particular number
pub fn new(starting_num: S, data: T) -> Self {
pub fn new(starting_num: S) -> Self {
let internal = SeqWaitInt {
waiters: BinaryHeap::new(),
current: starting_num,
shutdown: false,
data,
};
SeqWait {
internal: Mutex::new(internal),
@@ -151,13 +144,10 @@ where
///
/// This call won't complete until someone has called `advance`
/// with a number greater than or equal to the one we're waiting for.
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
match self.queue_for_wait(num, false) {
Ok(Either::Left(data)) => Ok(data),
Ok(Either::Right(rx)) => match rx.await {
Err(_) => Err(SeqWaitError::Shutdown),
Ok(data) => Ok(data),
},
pub async fn wait_for(&self, num: V) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(mut rx)) => rx.changed().await.map_err(|_| SeqWaitError::Shutdown),
Err(e) => Err(e),
}
}
@@ -169,18 +159,15 @@ where
///
/// If that hasn't happened after the specified timeout duration,
/// [`SeqWaitError::Timeout`] will be returned.
///
/// Pass `timeout_duration.is_zero() == true` to guarantee that the
/// future that is this function will never await.
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
match self.queue_for_wait(num, timeout_duration.is_zero()) {
Ok(Either::Left(data)) => Ok(data),
Ok(Either::Right(rx)) => match timeout(timeout_duration, rx).await {
Ok(Ok(data)) => Ok(data),
) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(mut rx)) => match timeout(timeout_duration, rx.changed()).await {
Ok(Ok(())) => Ok(()),
Ok(Err(_)) => Err(SeqWaitError::Shutdown),
Err(_) => Err(SeqWaitError::Timeout),
},
@@ -190,50 +177,41 @@ where
/// Register and return a channel that will be notified when a number arrives,
/// or None, if it has already arrived.
fn queue_for_wait(&self, num: V, nowait: bool) -> Result<Either<T, Receiver<T>>, SeqWaitError> {
fn queue_for_wait(&self, num: V) -> Result<Option<Receiver<()>>, SeqWaitError> {
let mut internal = self.internal.lock().unwrap();
if internal.current.cnt_value() >= num {
return Ok(Either::Left(internal.data.clone()));
return Ok(None);
}
if internal.shutdown {
return Err(SeqWaitError::Shutdown);
}
if nowait {
return Err(SeqWaitError::Timeout);
}
// Create a new channel.
let (tx, rx) = channel();
let (tx, rx) = channel(());
internal.waiters.push(Waiter {
wake_num: num,
wake_channel: tx,
});
// Drop the lock as we exit this scope.
Ok(Either::Right(rx))
Ok(Some(rx))
}
/// Announce a new number has arrived
///
/// All waiters at this value or below will be woken.
///
/// If `new_data` is Some(), it will update the internal data,
/// even if `num` is smaller than the internal counter.
/// It will not cause a wake-up though, in this case.
///
/// Returns the old number.
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
pub fn advance(&self, num: V) -> V {
let old_value;
let (wake_these, with_data) = {
let wake_these = {
let mut internal = self.internal.lock().unwrap();
if let Some(new_data) = new_data {
internal.data = new_data;
}
old_value = internal.current.cnt_value();
if old_value >= num {
return old_value;
}
internal.current.cnt_advance(num);
// Pop all waiters <= num from the heap. Collect them in a vector, and
// wake them up after releasing the lock.
let mut wake_these = Vec::new();
@@ -243,13 +221,13 @@ where
}
wake_these.push(internal.waiters.pop().unwrap().wake_channel);
}
(wake_these, internal.data.clone())
wake_these
};
for tx in wake_these {
// This can fail if there are no receivers.
// We don't care; discard the error.
let _ = tx.send(with_data.clone());
let _ = tx.send(());
}
old_value
}
@@ -258,106 +236,6 @@ where
pub fn load(&self) -> S {
self.internal.lock().unwrap().current
}
/// Split the seqwait into a part than can only do wait,
/// and another part that can do advance + wait.
///
/// The wait-only part can be cloned, the advance part cannot be cloned.
/// This provides a single-producer multi-consumer scheme.
pub fn split_spmc(self) -> (Wait<S, V, T>, Advance<S, V, T>) {
let inner = Arc::new(self);
let w = Wait {
inner: inner.clone(),
};
let a = Advance { inner };
(w, a)
}
}
/// See [`SeqWait::split_spmc`].
pub struct Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
inner: Arc<SeqWait<S, V, T>>,
}
/// See [`SeqWait::split_spmc`].
pub struct Advance<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
inner: Arc<SeqWait<S, V, T>>,
}
impl<S, V, T> Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// See [`SeqWait::wait_for`].
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
self.inner.wait_for(num).await
}
/// See [`SeqWait::wait_for_timeout`].
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
self.inner.wait_for_timeout(num, timeout_duration).await
}
}
impl<S, V, T> Advance<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
/// See [`SeqWait::advance`].
pub fn advance(&self, num: V, new_data: Option<T>) -> V {
self.inner.advance(num, new_data)
}
/// See [`SeqWait::wait_for`].
pub async fn wait_for(&self, num: V) -> Result<T, SeqWaitError> {
self.inner.wait_for(num).await
}
/// See [`SeqWait::wait_for_timeout`].
pub async fn wait_for_timeout(
&self,
num: V,
timeout_duration: Duration,
) -> Result<T, SeqWaitError> {
self.inner.wait_for_timeout(num, timeout_duration).await
}
/// Get a `Clone::clone` of the current data inside the seqwait.
pub fn get_current_data(&self) -> (V, T) {
let inner = self.inner.internal.lock().unwrap();
(inner.current.cnt_value(), inner.data.clone())
}
}
impl<S, V, T> Clone for Wait<S, V, T>
where
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
T: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
@@ -378,12 +256,12 @@ mod tests {
#[tokio::test]
async fn seqwait() {
let seq = Arc::new(SeqWait::new(0, ()));
let seq = Arc::new(SeqWait::new(0));
let seq2 = Arc::clone(&seq);
let seq3 = Arc::clone(&seq);
let jh1 = tokio::task::spawn(async move {
seq2.wait_for(42).await.expect("wait_for 42");
let old = seq2.advance(100, None);
let old = seq2.advance(100);
assert_eq!(old, 99);
seq2.wait_for_timeout(999, Duration::from_millis(100))
.await
@@ -394,12 +272,12 @@ mod tests {
seq3.wait_for(0).await.expect("wait_for 0");
});
tokio::time::sleep(Duration::from_millis(200)).await;
let old = seq.advance(99, None);
let old = seq.advance(99);
assert_eq!(old, 0);
seq.wait_for(100).await.expect("wait_for 100");
// Calling advance with a smaller value is a no-op
assert_eq!(seq.advance(98, None), 100);
assert_eq!(seq.advance(98), 100);
assert_eq!(seq.load(), 100);
jh1.await.unwrap();
@@ -410,7 +288,7 @@ mod tests {
#[tokio::test]
async fn seqwait_timeout() {
let seq = Arc::new(SeqWait::new(0, ()));
let seq = Arc::new(SeqWait::new(0));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
let timeout = Duration::from_millis(1);
@@ -420,104 +298,10 @@ mod tests {
tokio::time::sleep(Duration::from_millis(200)).await;
// This will attempt to wake, but nothing will happen
// because the waiter already dropped its Receiver.
let old = seq.advance(99, None);
let old = seq.advance(99);
assert_eq!(old, 0);
jh.await.unwrap();
seq.shutdown();
}
#[tokio::test]
async fn data_basic() {
let seq = Arc::new(SeqWait::new(0, "a"));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
let data = seq.wait_for(2).await.unwrap();
assert_eq!(data, "b");
});
seq2.advance(1, Some("x"));
seq2.advance(2, Some("b"));
jh.await.unwrap();
}
#[test]
fn data_always_most_recent() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let seq = Arc::new(SeqWait::new(0, "a"));
let seq2 = Arc::clone(&seq);
let jh = rt.spawn(async move {
let data = seq.wait_for(2).await.unwrap();
assert_eq!(data, "d");
});
// jh is not running until we poll it, thanks to current thread runtime
rt.block_on(async move {
seq2.advance(2, Some("b"));
seq2.advance(3, Some("c"));
seq2.advance(4, Some("d"));
});
rt.block_on(jh).unwrap();
}
#[tokio::test]
async fn split_spmc_api_surface() {
let seq = SeqWait::new(0, 1);
let (w, a) = seq.split_spmc();
let _ = w.wait_for(1);
let _ = w.wait_for_timeout(0, Duration::from_secs(10));
let _ = w.clone();
let _ = a.advance(1, None);
let _ = a.wait_for(1);
let _ = a.wait_for_timeout(0, Duration::from_secs(10));
// TODO would be nice to have must-not-compile tests for Advance not being clonable.
}
#[tokio::test]
async fn new_data_same_lsn() {
let seq = Arc::new(SeqWait::new(0, "a"));
seq.advance(1, Some("b"));
let data = seq.wait_for(1).await.unwrap();
assert_eq!(data, "b", "the regular case where lsn and data advance");
seq.advance(1, Some("c"));
let data = seq.wait_for(1).await.unwrap();
assert_eq!(
data, "c",
"no lsn advance still gives new data for old lsn wait_for's"
);
let (start_wait_for_sender, start_wait_for_receiver) = tokio::sync::oneshot::channel();
// ensure we don't wake waiters for data-only change
let jh = tokio::spawn({
let seq = seq.clone();
async move {
start_wait_for_receiver.await.unwrap();
match tokio::time::timeout(Duration::from_secs(2), seq.wait_for(2)).await {
Ok(_) => {
assert!(
false,
"advance should not wake waiters if data changes but LSN doesn't"
);
}
Err(_) => {
// Good, we weren't woken up.
}
}
}
});
seq.advance(1, Some("d"));
start_wait_for_sender.send(()).unwrap();
jh.await.unwrap();
}
}

View File

@@ -13,7 +13,7 @@ use std::time::Instant;
use utils::lsn::Lsn;
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
let mut layer_map = LayerMap::<LayerDescriptor>::default();
@@ -114,7 +114,7 @@ fn bench_from_captest_env(c: &mut Criterion) {
c.bench_function("captest_uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1);
black_box(layer_map.search(q.0, q.1));
}
});
});
@@ -122,11 +122,11 @@ fn bench_from_captest_env(c: &mut Criterion) {
// test with a key that corresponds to the RelDir entry. See pgdatadir_mapping.rs.
c.bench_function("captest_rel_dir_query", |b| {
b.iter(|| {
let result = layer_map.search(
let result = black_box(layer_map.search(
Key::from_hex("000000067F00008000000000000000000001").unwrap(),
// This LSN is higher than any of the LSNs in the tree
Lsn::from_str("D0/80208AE1").unwrap(),
);
));
result.unwrap();
});
});
@@ -183,7 +183,7 @@ fn bench_from_real_project(c: &mut Criterion) {
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1);
black_box(layer_map.search(q.0, q.1));
}
});
});
@@ -232,7 +232,7 @@ fn bench_sequential(c: &mut Criterion) {
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1);
black_box(layer_map.search(q.0, q.1));
}
});
});

View File

@@ -6,6 +6,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde::de::IntoDeserializer;
use std::env;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
@@ -62,7 +63,6 @@ pub mod defaults {
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
///
/// Default built-in configuration file.
@@ -91,7 +91,6 @@ pub mod defaults {
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
@@ -108,6 +107,7 @@ pub mod defaults {
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
# [remote_storage]
@@ -182,9 +182,6 @@ pub struct PageServerConf {
pub metric_collection_endpoint: Option<Url>,
pub synthetic_size_calculation_interval: Duration,
// See the corresponding metric's help string.
pub evictions_low_residence_duration_metric_threshold: Duration,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
pub test_remote_failures: u64,
@@ -257,8 +254,6 @@ struct PageServerConfigBuilder {
metric_collection_endpoint: BuilderValue<Option<Url>>,
synthetic_size_calculation_interval: BuilderValue<Duration>,
evictions_low_residence_duration_metric_threshold: BuilderValue<Duration>,
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
test_remote_failures: BuilderValue<u64>,
@@ -316,11 +311,6 @@ impl Default for PageServerConfigBuilder {
.expect("cannot parse default synthetic size calculation interval")),
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
evictions_low_residence_duration_metric_threshold: Set(humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")),
disk_usage_based_eviction: Set(None),
test_remote_failures: Set(0),
@@ -438,10 +428,6 @@ impl PageServerConfigBuilder {
self.test_remote_failures = BuilderValue::Set(fail_first);
}
pub fn evictions_low_residence_duration_metric_threshold(&mut self, value: Duration) {
self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value);
}
pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
self.disk_usage_based_eviction = BuilderValue::Set(value);
}
@@ -525,11 +511,6 @@ impl PageServerConfigBuilder {
synthetic_size_calculation_interval: self
.synthetic_size_calculation_interval
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.ok_or(anyhow!(
"missing evictions_low_residence_duration_metric_threshold"
))?,
disk_usage_based_eviction: self
.disk_usage_based_eviction
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
@@ -721,12 +702,12 @@ impl PageServerConf {
"synthetic_size_calculation_interval" =>
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
"evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?),
"disk_usage_based_eviction" => {
tracing::info!("disk_usage_based_eviction: {:#?}", &item);
builder.disk_usage_based_eviction(
toml_edit::de::from_item(item.clone())
.context("parse disk_usage_based_eviction")?)
deserialize_from_item("disk_usage_based_eviction", item)
.context("parse disk_usage_based_eviction")?
)
},
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
@@ -827,18 +808,25 @@ impl PageServerConf {
if let Some(eviction_policy) = item.get("eviction_policy") {
t_conf.eviction_policy = Some(
toml_edit::de::from_item(eviction_policy.clone())
deserialize_from_item("eviction_policy", eviction_policy)
.context("parse eviction_policy")?,
);
}
if let Some(item) = item.get("min_resident_size_override") {
t_conf.min_resident_size_override = Some(
toml_edit::de::from_item(item.clone())
deserialize_from_item("min_resident_size_override", item)
.context("parse min_resident_size_override")?,
);
}
if let Some(item) = item.get("evictions_low_residence_duration_metric_threshold") {
t_conf.evictions_low_residence_duration_metric_threshold = Some(parse_toml_duration(
"evictions_low_residence_duration_metric_threshold",
item,
)?);
}
Ok(t_conf)
}
@@ -877,10 +865,6 @@ impl PageServerConf {
cached_metric_collection_interval: Duration::from_secs(60 * 60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: Duration::from_secs(60),
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.unwrap(),
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -938,6 +922,18 @@ where
})
}
fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned,
{
// ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
let deserializer = match item.clone().into_value() {
Ok(value) => value.into_deserializer(),
Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
};
T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
}
/// Configurable semaphore permits setting.
///
/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
@@ -1004,9 +1000,10 @@ mod tests {
use remote_storage::{RemoteStorageKind, S3Config};
use tempfile::{tempdir, TempDir};
use utils::serde_percent::Percent;
use super::*;
use crate::DEFAULT_PG_VERSION;
use crate::{tenant::config::EvictionPolicy, DEFAULT_PG_VERSION};
const ALL_BASE_VALUES_TOML: &str = r#"
# Initial configuration file created by 'pageserver --init'
@@ -1029,8 +1026,6 @@ cached_metric_collection_interval = '22200 s'
metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s'
evictions_low_residence_duration_metric_threshold = '444 s'
log_format = 'json'
"#;
@@ -1087,9 +1082,6 @@ log_format = 'json'
synthetic_size_calculation_interval: humantime::parse_duration(
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
)?,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD
)?,
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -1144,7 +1136,6 @@ log_format = 'json'
cached_metric_collection_interval: Duration::from_secs(22200),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
synthetic_size_calculation_interval: Duration::from_secs(333),
evictions_low_residence_duration_metric_threshold: Duration::from_secs(444),
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
@@ -1310,6 +1301,71 @@ trace_read_requests = {trace_read_requests}"#,
Ok(())
}
#[test]
fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let pageserver_conf_toml = format!(
r#"pg_distrib_dir = "{}"
metric_collection_endpoint = "http://sample.url"
metric_collection_interval = "10min"
id = 222
[disk_usage_based_eviction]
max_usage_pct = 80
min_avail_bytes = 0
period = "10s"
[tenant_config]
evictions_low_residence_duration_metric_threshold = "20m"
[tenant_config.eviction_policy]
kind = "LayerAccessThreshold"
period = "20m"
threshold = "20m"
"#,
pg_distrib_dir.display(),
);
let toml: Document = pageserver_conf_toml.parse()?;
let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
assert_eq!(
conf.metric_collection_endpoint,
Some("http://sample.url".parse().unwrap())
);
assert_eq!(
conf.metric_collection_interval,
Duration::from_secs(10 * 60)
);
assert_eq!(
conf.default_tenant_conf
.evictions_low_residence_duration_metric_threshold,
Duration::from_secs(20 * 60)
);
assert_eq!(conf.id, NodeId(222));
assert_eq!(
conf.disk_usage_based_eviction,
Some(DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 0,
period: Duration::from_secs(10),
#[cfg(feature = "testing")]
mock_statvfs: None,
})
);
match &conf.default_tenant_conf.eviction_policy {
EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
}
}
Ok(())
}
fn prepare_fs(tempdir: &TempDir) -> anyhow::Result<(PathBuf, PathBuf)> {
let tempdir_path = tempdir.path();

View File

@@ -781,6 +781,19 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let target_tenant_id = request_data
.new_tenant_id
.map(TenantId::from)
@@ -914,6 +927,19 @@ async fn update_tenant_config_handler(
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
&evictions_low_residence_duration_metric_threshold,
))
.map_err(ApiError::BadRequest)?,
);
}
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
.instrument(info_span!("tenant_config", tenant = ?tenant_id))

View File

@@ -257,6 +257,22 @@ impl EvictionsWithLowResidenceDuration {
}
}
pub fn change_threshold(
&mut self,
tenant_id: &str,
timeline_id: &str,
new_threshold: Duration,
) {
if new_threshold == self.threshold {
return;
}
let mut with_new =
EvictionsWithLowResidenceDurationBuilder::new(self.data_source, new_threshold)
.build(tenant_id, timeline_id);
std::mem::swap(self, &mut with_new);
with_new.remove(tenant_id, timeline_id);
}
// This could be a `Drop` impl, but, we need the `tenant_id` and `timeline_id`.
fn remove(&mut self, tenant_id: &str, timeline_id: &str) {
let Some(_counter) = self.counter.take() else {
@@ -589,7 +605,7 @@ pub struct TimelineMetrics {
pub num_persistent_files_created: IntCounter,
pub persistent_bytes_written: IntCounter,
pub evictions: IntCounter,
pub evictions_with_low_residence_duration: EvictionsWithLowResidenceDuration,
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
}
impl TimelineMetrics {
@@ -656,7 +672,9 @@ impl TimelineMetrics {
num_persistent_files_created,
persistent_bytes_written,
evictions,
evictions_with_low_residence_duration,
evictions_with_low_residence_duration: std::sync::RwLock::new(
evictions_with_low_residence_duration,
),
}
}
}
@@ -675,6 +693,8 @@ impl Drop for TimelineMetrics {
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
self.evictions_with_low_residence_duration
.write()
.unwrap()
.remove(tenant_id, timeline_id);
for op in STORAGE_TIME_OPERATIONS {
let _ =

View File

@@ -65,7 +65,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = format!("pageserver is shutting down");
let msg = "pageserver is shutting down".to_string();
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}

View File

@@ -1735,6 +1735,13 @@ impl Tenant {
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
*self.tenant_conf.write().unwrap() = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated();
}
}
fn create_timeline_data(
@@ -1887,7 +1894,7 @@ impl Tenant {
.to_string();
// Convert the config to a toml file.
conf_content += &toml_edit::easy::to_string(&tenant_conf)?;
conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
let mut target_config_file = VirtualFile::open_with_options(
target_config_path,
@@ -2815,6 +2822,9 @@ pub mod harness {
trace_read_requests: Some(tenant_conf.trace_read_requests),
eviction_policy: Some(tenant_conf.eviction_policy),
min_resident_size_override: tenant_conf.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: Some(
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
}
}
}

View File

@@ -39,6 +39,7 @@ pub mod defaults {
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}
/// Per-tenant configuration options
@@ -93,6 +94,9 @@ pub struct TenantConf {
pub trace_read_requests: bool,
pub eviction_policy: EvictionPolicy,
pub min_resident_size_override: Option<u64>,
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -164,6 +168,11 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub min_resident_size_override: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -228,6 +237,9 @@ impl TenantConfOpt {
min_resident_size_override: self
.min_resident_size_override
.or(global_conf.min_resident_size_override),
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
}
}
}
@@ -260,6 +272,10 @@ impl Default for TenantConf {
trace_read_requests: false,
eviction_policy: EvictionPolicy::NoEviction,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
}
}
}
@@ -275,9 +291,9 @@ mod tests {
..TenantConfOpt::default()
};
let toml_form = toml_edit::easy::to_string(&small_conf).unwrap();
let toml_form = toml_edit::ser::to_string(&small_conf).unwrap();
assert_eq!(toml_form, "gc_horizon = 42\n");
assert_eq!(small_conf, toml_edit::easy::from_str(&toml_form).unwrap());
assert_eq!(small_conf, toml_edit::de::from_str(&toml_form).unwrap());
let json_form = serde_json::to_string(&small_conf).unwrap();
assert_eq!(json_form, "{\"gc_horizon\":42}");

View File

@@ -74,7 +74,7 @@ pub(super) async fn upload_timeline_layer<'a>(
})?;
storage
.upload(Box::new(source_file), fs_size, &storage_path, None)
.upload(source_file, fs_size, &storage_path, None)
.await
.with_context(|| {
format!(

View File

@@ -77,6 +77,7 @@ pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -145,7 +146,7 @@ pub struct Timeline {
// 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
// first WAL record when the node is started up. But here, we just
// keep track of it.
last_record_lsn: SeqWait<RecordLsn, Lsn, ()>,
last_record_lsn: SeqWait<RecordLsn, Lsn>,
// All WAL records have been processed and stored durably on files on
// local disk, up to this LSN. On crash and restart, we need to re-process
@@ -161,7 +162,7 @@ pub struct Timeline {
ancestor_timeline: Option<Arc<Timeline>>,
ancestor_lsn: Lsn,
metrics: TimelineMetrics,
pub(super) metrics: TimelineMetrics,
/// Ensures layers aren't frozen by checkpointer between
/// [`Timeline::get_layer_for_write`] and layer reads.
@@ -1136,6 +1137,8 @@ impl Timeline {
if let Some(delta) = local_layer_residence_duration {
self.metrics
.evictions_with_low_residence_duration
.read()
.unwrap()
.observe(delta);
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
@@ -1209,6 +1212,35 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
}
fn get_evictions_low_residence_duration_metric_threshold(
tenant_conf: &TenantConfOpt,
default_tenant_conf: &TenantConf,
) -> Duration {
tenant_conf
.evictions_low_residence_duration_metric_threshold
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
// The threshold is embedded in the metric. So, we need to update it.
{
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
&self.tenant_conf.read().unwrap(),
&self.conf.default_tenant_conf,
);
let tenant_id_str = self.tenant_id.to_string();
let timeline_id_str = self.timeline_id.to_string();
self.metrics
.evictions_with_low_residence_duration
.write()
.unwrap()
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
}
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.
@@ -1240,6 +1272,11 @@ impl Timeline {
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
let evictions_low_residence_duration_metric_threshold =
Self::get_evictions_low_residence_duration_metric_threshold(
&tenant_conf_guard,
&conf.default_tenant_conf,
);
drop(tenant_conf_guard);
Arc::new_cyclic(|myself| {
@@ -1270,13 +1307,10 @@ impl Timeline {
remote_client: remote_client.map(Arc::new),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(
RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
},
(),
),
last_record_lsn: SeqWait::new(RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
}),
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
@@ -1290,7 +1324,7 @@ impl Timeline {
&timeline_id,
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
"mtime",
conf.evictions_low_residence_duration_metric_threshold,
evictions_low_residence_duration_metric_threshold,
),
),
@@ -2423,7 +2457,7 @@ impl Timeline {
assert!(new_lsn.is_aligned());
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
self.last_record_lsn.advance(new_lsn, None);
self.last_record_lsn.advance(new_lsn);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {

View File

@@ -23,7 +23,6 @@ use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
@@ -374,7 +373,7 @@ impl BrokerService for Broker {
Ok(info) => yield info,
Err(RecvError::Lagged(skipped_msg)) => {
missed_msgs += skipped_msg;
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
missed_msgs = 0;

View File

@@ -1913,15 +1913,26 @@ def remote_pg(
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
host = parse_dsn(connstr).get("host", "")
is_neon = host.endswith(".neon.build")
start_ms = int(datetime.utcnow().timestamp() * 1000)
with RemotePostgres(pg_bin, connstr) as remote_pg:
if is_neon:
timeline_id = TimelineId(remote_pg.safe_psql("SHOW neon.timeline_id")[0][0])
yield remote_pg
end_ms = int(datetime.utcnow().timestamp() * 1000)
host = parse_dsn(connstr).get("host", "")
if host.endswith(".neon.build"):
if is_neon:
# Add 10s margin to the start and end times
allure_add_grafana_links(host, start_ms - 10_000, end_ms + 10_000)
allure_add_grafana_links(
host,
timeline_id,
start_ms - 10_000,
end_ms + 10_000,
)
class PSQL:

View File

@@ -519,6 +519,13 @@ class PageserverHttpClient(requests.Session):
assert res.status_code == 200
def download_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)
for layer in info.historic_layers:
if not layer.remote:
continue
self.download_layer(tenant_id, timeline_id, layer.layer_file_name)
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",

View File

@@ -13,6 +13,7 @@ import allure
from psycopg2.extensions import cursor
from fixtures.log_helper import log
from fixtures.types import TimelineId
Fn = TypeVar("Fn", bound=Callable[..., Any])
@@ -186,11 +187,15 @@ def allure_attach_from_dir(dir: Path):
allure.attach.file(source, name, attachment_type, extension)
DATASOURCE_ID = "xHHYY0dVz"
GRAFANA_URL = "https://neonprod.grafana.net"
GRAFANA_EXPLORE_URL = f"{GRAFANA_URL}/explore"
GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL = f"{GRAFANA_URL}/d/8G011dlnk/timeline-inspector"
LOGS_STAGING_DATASOURCE_ID = "xHHYY0dVz"
def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int, end_ms: int):
"""Add links to server logs in Grafana to Allure report"""
links = {}
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
endpoint_id, region_id, _ = host.split(".", 2)
@@ -202,12 +207,12 @@ def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
}
params: Dict[str, Any] = {
"datasource": DATASOURCE_ID,
"datasource": LOGS_STAGING_DATASOURCE_ID,
"queries": [
{
"expr": "<PUT AN EXPRESSION HERE>",
"refId": "A",
"datasource": {"type": "loki", "uid": DATASOURCE_ID},
"datasource": {"type": "loki", "uid": LOGS_STAGING_DATASOURCE_ID},
"editorMode": "code",
"queryType": "range",
}
@@ -220,8 +225,23 @@ def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
for name, expr in expressions.items():
params["queries"][0]["expr"] = expr
query_string = urlencode({"orgId": 1, "left": json.dumps(params)})
link = f"https://neonprod.grafana.net/explore?{query_string}"
links[name] = f"{GRAFANA_EXPLORE_URL}?{query_string}"
timeline_qs = urlencode(
{
"orgId": 1,
"var-environment": "victoria-metrics-aws-dev",
"var-timeline_id": timeline_id,
"var-endpoint_id": endpoint_id,
"var-log_datasource": "grafanacloud-neonstaging-logs",
"from": start_ms,
"to": end_ms,
}
)
link = f"{GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL}?{timeline_qs}"
links["Timeline Inspector"] = link
for name, link in links.items():
allure.dynamic.link(link, name=name)
log.info(f"{name}: {link}")

View File

@@ -18,7 +18,11 @@ def test_tenant_config(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = """
page_cache_size=444;
wait_lsn_timeout='111 s';
tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
[tenant_config]
checkpoint_distance = 10000
compaction_target_size = 1048576
evictions_low_residence_duration_metric_threshold = "2 days"
"""
env = neon_env_builder.init_start()
http_client = env.pageserver.http_client()
@@ -39,6 +43,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
new_conf = {
"checkpoint_distance": "20000",
"gc_period": "30sec",
"evictions_low_residence_duration_metric_threshold": "42s",
}
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
@@ -78,6 +83,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
assert effective_config["gc_period"] == "1h"
assert effective_config["image_creation_threshold"] == 3
assert effective_config["pitr_interval"] == "7days"
assert effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
# check the configuration of the new tenant
with closing(env.pageserver.connect()) as psconn:
@@ -112,6 +118,9 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
assert (
new_effective_config["gc_period"] == "30s"
), "Specific 'gc_period' config should override the default value"
assert (
new_effective_config["evictions_low_residence_duration_metric_threshold"] == "42s"
), "Should override default value"
assert new_effective_config["compaction_target_size"] == 1048576
assert new_effective_config["compaction_period"] == "20s"
assert new_effective_config["compaction_threshold"] == 10
@@ -125,6 +134,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
"gc_period": "80sec",
"compaction_period": "80sec",
"image_creation_threshold": "2",
"evictions_low_residence_duration_metric_threshold": "23h",
}
env.neon_cli.config_tenant(
tenant_id=tenant,
@@ -167,6 +177,9 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
assert (
updated_effective_config["compaction_period"] == "1m 20s"
), "Specific 'compaction_period' config should override the default value"
assert (
updated_effective_config["evictions_low_residence_duration_metric_threshold"] == "23h"
), "Should override default value"
assert updated_effective_config["compaction_target_size"] == 1048576
assert updated_effective_config["compaction_threshold"] == 10
assert updated_effective_config["gc_horizon"] == 67108864
@@ -225,6 +238,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
assert final_effective_config["gc_horizon"] == 67108864
assert final_effective_config["gc_period"] == "1h"
assert final_effective_config["image_creation_threshold"] == 3
assert final_effective_config["evictions_low_residence_duration_metric_threshold"] == "2days"
# restart the pageserver and ensure that the config is still correct
env.pageserver.stop()
@@ -285,3 +299,81 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
# dont test applying the setting here, we have that another test case to show it
# we just care about being able to create the file
assert len(contents_first) > len(contents_later)
def test_live_reconfig_get_evictions_low_residence_duration_metric_threshold(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_live_reconfig_get_evictions_low_residence_duration_metric_threshold",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
ps_http = env.pageserver.http_client()
def get_metric():
metrics = ps_http.get_metrics()
metric = metrics.query_one(
"pageserver_evictions_with_low_residence_duration_total",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
return metric
default_value = ps_http.tenant_config(tenant_id).effective_config[
"evictions_low_residence_duration_metric_threshold"
]
metric = get_metric()
assert int(metric.value) == 0, "metric is present with default value"
assert default_value == "1day"
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.value) > 0, "metric is updated"
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": default_value}
)
updated_metric = get_metric()
assert int(updated_metric.value) == int(
metric.value
), "metric is unchanged when setting same value"
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": "2day"}
)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 24 * 60 * 60
assert int(metric.value) == 0
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 24 * 60 * 60
assert int(metric.value) > 0
env.neon_cli.config_tenant(
tenant_id, {"evictions_low_residence_duration_metric_threshold": "2h"}
)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 60 * 60
assert int(metric.value) == 0, "value resets if label changes"
ps_http.download_all_layers(tenant_id, timeline_id)
ps_http.evict_all_layers(tenant_id, timeline_id)
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 2 * 60 * 60
assert int(metric.value) > 0, "set a non-zero value for next step"
env.neon_cli.config_tenant(tenant_id, {})
metric = get_metric()
assert int(metric.labels["low_threshold_secs"]) == 24 * 60 * 60, "label resets to default"
assert int(metric.value) == 0, "value resets to default"

View File

@@ -4,8 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap.workspace = true
anyhow.workspace = true

View File

@@ -18,6 +18,7 @@ byteorder = { version = "1" }
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
@@ -29,7 +30,6 @@ futures-executor = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -52,7 +52,8 @@ socket2 = { version = "0.4", default-features = false, features = ["all"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "sync", "time"] }
tokio-rustls = { version = "0.23" }
tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = { version = "0.8", features = ["tls-roots"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
toml_edit = { version = "0.19", features = ["serde"] }
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
@@ -64,7 +65,6 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
either = { version = "1" }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -74,6 +74,7 @@ prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.6" }
serde = { version = "1", features = ["alloc", "derive"] }
syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }
### END HAKARI SECTION