mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-01 09:40:38 +00:00
Compare commits
12 Commits
approximat
...
heap_bitma
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81517aeda6 | ||
|
|
814abd9f84 | ||
|
|
75ffe34b17 | ||
|
|
d2aa31f0ce | ||
|
|
22f9ea5fe2 | ||
|
|
d0711d0896 | ||
|
|
271f6a6e99 | ||
|
|
a64dd3ecb5 | ||
|
|
bf46237fc2 | ||
|
|
41d364a8f1 | ||
|
|
fa54a57ca2 | ||
|
|
1c1bb904ed |
8
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
8
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
@@ -8,6 +8,14 @@ storage:
|
||||
pg_distrib_dir: /usr/local
|
||||
metric_collection_endpoint: http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events
|
||||
metric_collection_interval: 10min
|
||||
disk_usage_based_eviction:
|
||||
max_usage_pct: 80
|
||||
# TODO: learn typical resident-size growth rate [GiB/minute] and configure
|
||||
# min_avail_bytes such that we have X minutes of headroom.
|
||||
min_avail_bytes: 0
|
||||
# We assume that the worst-case growth rate is small enough that we can
|
||||
# catch above-threshold conditions by checking every 10s.
|
||||
period: "10s"
|
||||
tenant_config:
|
||||
eviction_policy:
|
||||
kind: "LayerAccessThreshold"
|
||||
|
||||
8
.github/ansible/staging.us-east-2.hosts.yaml
vendored
8
.github/ansible/staging.us-east-2.hosts.yaml
vendored
@@ -8,6 +8,14 @@ storage:
|
||||
pg_distrib_dir: /usr/local
|
||||
metric_collection_endpoint: http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events
|
||||
metric_collection_interval: 10min
|
||||
disk_usage_based_eviction:
|
||||
max_usage_pct: 80
|
||||
# TODO: learn typical resident-size growth rate [GiB/minute] and configure
|
||||
# min_avail_bytes such that we have X minutes of headroom.
|
||||
min_avail_bytes: 0
|
||||
# We assume that the worst-case growth rate is small enough that we can
|
||||
# catch above-threshold conditions by checking every 10s.
|
||||
period: "10s"
|
||||
tenant_config:
|
||||
eviction_policy:
|
||||
kind: "LayerAccessThreshold"
|
||||
|
||||
@@ -30,10 +30,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: dev
|
||||
zenith_region: eu-west-1
|
||||
zenith_region_slug: eu-west-1
|
||||
neon_service: proxy-scram
|
||||
neon_env: dev
|
||||
neon_region: eu-west-1
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
|
||||
@@ -15,10 +15,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy-link pods
|
||||
podLabels:
|
||||
zenith_service: proxy
|
||||
zenith_env: dev
|
||||
zenith_region: us-east-2
|
||||
zenith_region_slug: us-east-2
|
||||
neon_service: proxy
|
||||
neon_env: dev
|
||||
neon_region: us-east-2
|
||||
|
||||
service:
|
||||
type: LoadBalancer
|
||||
|
||||
@@ -15,10 +15,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram-legacy
|
||||
zenith_env: dev
|
||||
zenith_region: us-east-2
|
||||
zenith_region_slug: us-east-2
|
||||
neon_service: proxy-scram-legacy
|
||||
neon_env: dev
|
||||
neon_region: us-east-2
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
|
||||
@@ -30,10 +30,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: dev
|
||||
zenith_region: us-east-2
|
||||
zenith_region_slug: us-east-2
|
||||
neon_service: proxy-scram
|
||||
neon_env: dev
|
||||
neon_region: us-east-2
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
|
||||
@@ -31,10 +31,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: prod
|
||||
zenith_region: ap-southeast-1
|
||||
zenith_region_slug: ap-southeast-1
|
||||
neon_service: proxy-scram
|
||||
neon_env: prod
|
||||
neon_region: ap-southeast-1
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
|
||||
@@ -31,10 +31,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: prod
|
||||
zenith_region: eu-central-1
|
||||
zenith_region_slug: eu-central-1
|
||||
neon_service: proxy-scram
|
||||
neon_env: prod
|
||||
neon_region: eu-central-1
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
|
||||
@@ -13,10 +13,9 @@ settings:
|
||||
|
||||
# -- Additional labels for zenith-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy
|
||||
zenith_env: production
|
||||
zenith_region: us-east-2
|
||||
zenith_region_slug: us-east-2
|
||||
neon_service: proxy
|
||||
neon_env: production
|
||||
neon_region: us-east-2
|
||||
|
||||
service:
|
||||
type: LoadBalancer
|
||||
|
||||
@@ -31,10 +31,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: prod
|
||||
zenith_region: us-east-2
|
||||
zenith_region_slug: us-east-2
|
||||
neon_service: proxy-scram
|
||||
neon_env: prod
|
||||
neon_region: us-east-2
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
|
||||
@@ -31,10 +31,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: prod
|
||||
zenith_region: us-west-2
|
||||
zenith_region_slug: us-west-2
|
||||
neon_service: proxy-scram
|
||||
neon_env: prod
|
||||
neon_region: us-west-2
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
|
||||
@@ -31,10 +31,9 @@ settings:
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: prod
|
||||
zenith_region: us-west-2
|
||||
zenith_region_slug: us-west-2
|
||||
neon_service: proxy-scram
|
||||
neon_env: prod
|
||||
neon_region: us-west-2
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
|
||||
4
.github/pull_request_template.md
vendored
4
.github/pull_request_template.md
vendored
@@ -3,8 +3,12 @@
|
||||
## Issue ticket number and link
|
||||
|
||||
## Checklist before requesting a review
|
||||
|
||||
- [ ] I have performed a self-review of my code.
|
||||
- [ ] If it is a core feature, I have added thorough tests.
|
||||
- [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard?
|
||||
- [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section.
|
||||
|
||||
## Checklist before merging
|
||||
|
||||
- [ ] Do not forget to reformat commit message to not include the above checklist
|
||||
|
||||
10
.github/workflows/build_and_test.yml
vendored
10
.github/workflows/build_and_test.yml
vendored
@@ -898,6 +898,16 @@ jobs:
|
||||
needs: [ push-docker-hub, tag, regress-tests ]
|
||||
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
|
||||
steps:
|
||||
- name: Fix git ownership
|
||||
run: |
|
||||
# Workaround for `fatal: detected dubious ownership in repository at ...`
|
||||
#
|
||||
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
|
||||
# Ref https://github.com/actions/checkout/issues/785
|
||||
#
|
||||
git config --global --add safe.directory ${{ github.workspace }}
|
||||
git config --global --add safe.directory ${GITHUB_WORKSPACE}
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
|
||||
4
.github/workflows/neon_extra_builds.yml
vendored
4
.github/workflows/neon_extra_builds.yml
vendored
@@ -53,14 +53,14 @@ jobs:
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Set extra env for macOS
|
||||
run: |
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2474,6 +2474,7 @@ dependencies = [
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"svg_fmt",
|
||||
"sync_wrapper",
|
||||
"tempfile",
|
||||
"tenant_size_model",
|
||||
"thiserror",
|
||||
@@ -4556,6 +4557,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"regex",
|
||||
"routerify",
|
||||
"sentry",
|
||||
"serde",
|
||||
|
||||
@@ -40,6 +40,8 @@ pacman -S base-devel readline zlib libseccomp openssl clang \
|
||||
postgresql-libs cmake postgresql protobuf
|
||||
```
|
||||
|
||||
Building Neon requires 3.15+ version of `protoc` (protobuf-compiler). If your distribution provides an older version, you can install a newer version from [here](https://github.com/protocolbuffers/protobuf/releases).
|
||||
|
||||
2. [Install Rust](https://www.rust-lang.org/tools/install)
|
||||
```
|
||||
# recommended approach from https://www.rust-lang.org/tools/install
|
||||
|
||||
@@ -203,13 +203,14 @@ fn main() -> Result<()> {
|
||||
if delay_exit {
|
||||
info!("giving control plane 30s to collect the error before shutdown");
|
||||
thread::sleep(Duration::from_secs(30));
|
||||
info!("shutting down");
|
||||
}
|
||||
|
||||
info!("shutting down tracing");
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit.
|
||||
tracing_utils::shutdown_tracing();
|
||||
|
||||
info!("shutting down");
|
||||
exit(exit_code.unwrap_or(1))
|
||||
}
|
||||
|
||||
|
||||
@@ -90,7 +90,6 @@ impl ComputeControlPlane {
|
||||
timeline_id,
|
||||
lsn,
|
||||
tenant_id,
|
||||
uses_wal_proposer: false,
|
||||
pg_version,
|
||||
});
|
||||
|
||||
@@ -115,7 +114,6 @@ pub struct PostgresNode {
|
||||
pub timeline_id: TimelineId,
|
||||
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
|
||||
pub tenant_id: TenantId,
|
||||
uses_wal_proposer: bool,
|
||||
pg_version: u32,
|
||||
}
|
||||
|
||||
@@ -149,7 +147,6 @@ impl PostgresNode {
|
||||
let port: u16 = conf.parse_field("port", &context)?;
|
||||
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
|
||||
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
|
||||
let uses_wal_proposer = conf.get("neon.safekeepers").is_some();
|
||||
|
||||
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
|
||||
// If it doesn't exist, assume broken data directory and use default pg version.
|
||||
@@ -172,7 +169,6 @@ impl PostgresNode {
|
||||
timeline_id,
|
||||
lsn: recovery_target_lsn,
|
||||
tenant_id,
|
||||
uses_wal_proposer,
|
||||
pg_version,
|
||||
})
|
||||
}
|
||||
@@ -364,7 +360,7 @@ impl PostgresNode {
|
||||
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
|
||||
let backup_lsn = if let Some(lsn) = self.lsn {
|
||||
Some(lsn)
|
||||
} else if self.uses_wal_proposer {
|
||||
} else if !self.env.safekeepers.is_empty() {
|
||||
// LSN 0 means that it is bootstrap and we need to download just
|
||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||
// procedure evolves quite actively right now, so let's think about it again
|
||||
|
||||
@@ -363,6 +363,11 @@ impl PageServerNode {
|
||||
.map(|x| serde_json::from_str(x))
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
.remove("min_resident_size_override")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -435,6 +440,11 @@ impl PageServerNode {
|
||||
.map(|x| serde_json::from_str(x))
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
.get("min_resident_size_override")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as an integer")?,
|
||||
})
|
||||
.send()?
|
||||
.error_from_body()?;
|
||||
|
||||
@@ -120,6 +120,7 @@ pub struct TenantCreateRequest {
|
||||
// We might do that once the eviction feature has stabilizied.
|
||||
// For now, this field is not even documented in the openapi_spec.yml.
|
||||
pub eviction_policy: Option<serde_json::Value>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -165,6 +166,7 @@ pub struct TenantConfigRequest {
|
||||
// We might do that once the eviction feature has stabilizied.
|
||||
// For now, this field is not even documented in the openapi_spec.yml.
|
||||
pub eviction_policy: Option<serde_json::Value>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
}
|
||||
|
||||
impl TenantConfigRequest {
|
||||
@@ -185,6 +187,7 @@ impl TenantConfigRequest {
|
||||
max_lsn_wal_lag: None,
|
||||
trace_read_requests: None,
|
||||
eviction_policy: None,
|
||||
min_resident_size_override: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ jsonwebtoken.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -51,6 +51,9 @@ pub mod history_buffer;
|
||||
|
||||
pub mod measured_stream;
|
||||
|
||||
pub mod serde_percent;
|
||||
pub mod serde_regex;
|
||||
|
||||
/// use with fail::cfg("$name", "return(2000)")
|
||||
#[macro_export]
|
||||
macro_rules! failpoint_sleep_millis_async {
|
||||
|
||||
83
libs/utils/src/serde_percent.rs
Normal file
83
libs/utils/src/serde_percent.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
//! A serde::Deserialize type for percentages.
|
||||
//!
|
||||
//! See [`Percent`] for details.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// If the value is not an integer between 0 and 100,
|
||||
/// deserialization fails with a descriptive error.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct Percent(#[serde(deserialize_with = "deserialize_pct_0_to_100")] u8);
|
||||
|
||||
impl Percent {
|
||||
pub fn get(&self) -> u8 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_pct_0_to_100<'de, D>(deserializer: D) -> Result<u8, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
let v: u8 = serde::de::Deserialize::deserialize(deserializer)?;
|
||||
if v > 100 {
|
||||
return Err(serde::de::Error::custom(
|
||||
"must be an integer between 0 and 100",
|
||||
));
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Percent;
|
||||
|
||||
#[derive(serde::Deserialize, serde::Serialize, Debug, PartialEq, Eq)]
|
||||
struct Foo {
|
||||
bar: Percent,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basics() {
|
||||
let input = r#"{ "bar": 50 }"#;
|
||||
let foo: Foo = serde_json::from_str(input).unwrap();
|
||||
assert_eq!(foo.bar.get(), 50);
|
||||
}
|
||||
#[test]
|
||||
fn null_handling() {
|
||||
let input = r#"{ "bar": null }"#;
|
||||
let res: Result<Foo, _> = serde_json::from_str(input);
|
||||
assert!(res.is_err());
|
||||
}
|
||||
#[test]
|
||||
fn zero() {
|
||||
let input = r#"{ "bar": 0 }"#;
|
||||
let foo: Foo = serde_json::from_str(input).unwrap();
|
||||
assert_eq!(foo.bar.get(), 0);
|
||||
}
|
||||
#[test]
|
||||
fn out_of_range_above() {
|
||||
let input = r#"{ "bar": 101 }"#;
|
||||
let res: Result<Foo, _> = serde_json::from_str(input);
|
||||
assert!(res.is_err());
|
||||
}
|
||||
#[test]
|
||||
fn out_of_range_below() {
|
||||
let input = r#"{ "bar": -1 }"#;
|
||||
let res: Result<Foo, _> = serde_json::from_str(input);
|
||||
assert!(res.is_err());
|
||||
}
|
||||
#[test]
|
||||
fn float() {
|
||||
let input = r#"{ "bar": 50.5 }"#;
|
||||
let res: Result<Foo, _> = serde_json::from_str(input);
|
||||
assert!(res.is_err());
|
||||
}
|
||||
#[test]
|
||||
fn string() {
|
||||
let input = r#"{ "bar": "50 %" }"#;
|
||||
let res: Result<Foo, _> = serde_json::from_str(input);
|
||||
assert!(res.is_err());
|
||||
}
|
||||
}
|
||||
60
libs/utils/src/serde_regex.rs
Normal file
60
libs/utils/src/serde_regex.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
//! A `serde::{Deserialize,Serialize}` type for regexes.
|
||||
|
||||
use std::ops::Deref;
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct Regex(
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_regex",
|
||||
serialize_with = "serialize_regex"
|
||||
)]
|
||||
regex::Regex,
|
||||
);
|
||||
|
||||
fn deserialize_regex<'de, D>(deserializer: D) -> Result<regex::Regex, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
let s: String = serde::de::Deserialize::deserialize(deserializer)?;
|
||||
let re = regex::Regex::new(&s).map_err(serde::de::Error::custom)?;
|
||||
Ok(re)
|
||||
}
|
||||
|
||||
fn serialize_regex<S>(re: ®ex::Regex, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
serializer.collect_str(re.as_str())
|
||||
}
|
||||
|
||||
impl Deref for Regex {
|
||||
type Target = regex::Regex;
|
||||
|
||||
fn deref(&self) -> ®ex::Regex {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Regex {
|
||||
fn eq(&self, other: &Regex) -> bool {
|
||||
// comparing the automatons would be quite complicated
|
||||
self.as_str() == other.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Regex {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
#[test]
|
||||
fn roundtrip() {
|
||||
let input = r#""foo.*bar""#;
|
||||
let re: super::Regex = serde_json::from_str(input).unwrap();
|
||||
assert!(re.is_match("foo123bar"));
|
||||
assert!(!re.is_match("foo"));
|
||||
let output = serde_json::to_string(&re).unwrap();
|
||||
assert_eq!(output, input);
|
||||
}
|
||||
}
|
||||
@@ -48,6 +48,7 @@ serde_json = { workspace = true, features = ["raw_value"] }
|
||||
serde_with.workspace = true
|
||||
signal-hook.workspace = true
|
||||
svg_fmt.workspace = true
|
||||
sync_wrapper.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
|
||||
@@ -8,6 +8,7 @@ use anyhow::{anyhow, Context};
|
||||
use clap::{Arg, ArgAction, Command};
|
||||
use fail::FailScenario;
|
||||
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tracing::*;
|
||||
|
||||
@@ -314,14 +315,34 @@ fn start_pageserver(
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?;
|
||||
|
||||
// shared state between the disk-usage backed eviction background task and the http endpoint
|
||||
// that allows triggering disk-usage based eviction manually. note that the http endpoint
|
||||
// is still accessible even if background task is not configured as long as remote storage has
|
||||
// been configured.
|
||||
let disk_usage_eviction_state: Arc<disk_usage_eviction_task::State> = Arc::default();
|
||||
|
||||
if let Some(remote_storage) = &remote_storage {
|
||||
launch_disk_usage_global_eviction_task(
|
||||
conf,
|
||||
remote_storage.clone(),
|
||||
disk_usage_eviction_state.clone(),
|
||||
)?;
|
||||
}
|
||||
|
||||
// Start up the service to handle HTTP mgmt API request. We created the
|
||||
// listener earlier already.
|
||||
{
|
||||
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
|
||||
|
||||
let router = http::make_router(conf, launch_ts, http_auth, remote_storage)?
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let router = http::make_router(
|
||||
conf,
|
||||
launch_ts,
|
||||
http_auth,
|
||||
remote_storage,
|
||||
disk_usage_eviction_state,
|
||||
)?
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let service = utils::http::RouterService::new(router).unwrap();
|
||||
let server = hyper::Server::from_tcp(http_listener)?
|
||||
.serve(service)
|
||||
|
||||
@@ -27,6 +27,7 @@ use utils::{
|
||||
logging::LogFormat,
|
||||
};
|
||||
|
||||
use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
|
||||
use crate::tenant::config::TenantConf;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME};
|
||||
@@ -92,6 +93,8 @@ pub mod defaults {
|
||||
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
|
||||
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
|
||||
|
||||
# [tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
@@ -104,6 +107,8 @@ pub mod defaults {
|
||||
#image_creation_threshold = {DEFAULT_IMAGE_CREATION_THRESHOLD}
|
||||
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
|
||||
|
||||
#min_resident_size_override = .. # in bytes
|
||||
|
||||
# [remote_storage]
|
||||
|
||||
"###
|
||||
@@ -180,6 +185,8 @@ pub struct PageServerConf {
|
||||
// See the corresponding metric's help string.
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
|
||||
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
|
||||
|
||||
pub test_remote_failures: u64,
|
||||
|
||||
pub ondemand_download_behavior_treat_error_as_warn: bool,
|
||||
@@ -252,6 +259,8 @@ struct PageServerConfigBuilder {
|
||||
|
||||
evictions_low_residence_duration_metric_threshold: BuilderValue<Duration>,
|
||||
|
||||
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
|
||||
|
||||
test_remote_failures: BuilderValue<u64>,
|
||||
|
||||
ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
|
||||
@@ -312,6 +321,8 @@ impl Default for PageServerConfigBuilder {
|
||||
)
|
||||
.expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")),
|
||||
|
||||
disk_usage_based_eviction: Set(None),
|
||||
|
||||
test_remote_failures: Set(0),
|
||||
|
||||
ondemand_download_behavior_treat_error_as_warn: Set(false),
|
||||
@@ -431,6 +442,10 @@ impl PageServerConfigBuilder {
|
||||
self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
|
||||
self.disk_usage_based_eviction = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn ondemand_download_behavior_treat_error_as_warn(
|
||||
&mut self,
|
||||
ondemand_download_behavior_treat_error_as_warn: bool,
|
||||
@@ -515,6 +530,9 @@ impl PageServerConfigBuilder {
|
||||
.ok_or(anyhow!(
|
||||
"missing evictions_low_residence_duration_metric_threshold"
|
||||
))?,
|
||||
disk_usage_based_eviction: self
|
||||
.disk_usage_based_eviction
|
||||
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
|
||||
test_remote_failures: self
|
||||
.test_remote_failures
|
||||
.ok_or(anyhow!("missing test_remote_failuers"))?,
|
||||
@@ -704,6 +722,12 @@ impl PageServerConf {
|
||||
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
|
||||
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
|
||||
"evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?),
|
||||
"disk_usage_based_eviction" => {
|
||||
tracing::info!("disk_usage_based_eviction: {:#?}", &item);
|
||||
builder.disk_usage_based_eviction(
|
||||
toml_edit::de::from_item(item.clone())
|
||||
.context("parse disk_usage_based_eviction")?)
|
||||
},
|
||||
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
@@ -808,6 +832,13 @@ impl PageServerConf {
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(item) = item.get("min_resident_size_override") {
|
||||
t_conf.min_resident_size_override = Some(
|
||||
toml_edit::de::from_item(item.clone())
|
||||
.context("parse min_resident_size_override")?,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(t_conf)
|
||||
}
|
||||
|
||||
@@ -850,6 +881,7 @@ impl PageServerConf {
|
||||
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.unwrap(),
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
}
|
||||
@@ -1058,6 +1090,7 @@ log_format = 'json'
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD
|
||||
)?,
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
},
|
||||
@@ -1112,6 +1145,7 @@ log_format = 'json'
|
||||
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
|
||||
synthetic_size_calculation_interval: Duration::from_secs(333),
|
||||
evictions_low_residence_duration_metric_threshold: Duration::from_secs(444),
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
},
|
||||
|
||||
689
pageserver/src/disk_usage_eviction_task.rs
Normal file
689
pageserver/src/disk_usage_eviction_task.rs
Normal file
@@ -0,0 +1,689 @@
|
||||
//! This module implements the pageserver-global disk-usage-based layer eviction task.
|
||||
//!
|
||||
//! # Mechanics
|
||||
//!
|
||||
//! Function `launch_disk_usage_global_eviction_task` starts a pageserver-global background
|
||||
//! loop that evicts layers in response to a shortage of available bytes
|
||||
//! in the $repo/tenants directory's filesystem.
|
||||
//!
|
||||
//! The loop runs periodically at a configurable `period`.
|
||||
//!
|
||||
//! Each loop iteration uses `statvfs` to determine filesystem-level space usage.
|
||||
//! It compares the returned usage data against two different types of thresholds.
|
||||
//! The iteration tries to evict layers until app-internal accounting says we should be below the thresholds.
|
||||
//! We cross-check this internal accounting with the real world by making another `statvfs` at the end of the iteration.
|
||||
//! We're good if that second statvfs shows that we're _actually_ below the configured thresholds.
|
||||
//! If we're still above one or more thresholds, we emit a warning log message, leaving it to the operator to investigate further.
|
||||
//!
|
||||
//! # Eviction Policy
|
||||
//!
|
||||
//! There are two thresholds:
|
||||
//! `max_usage_pct` is the relative available space, expressed in percent of the total filesystem space.
|
||||
//! If the actual usage is higher, the threshold is exceeded.
|
||||
//! `min_avail_bytes` is the absolute available space in bytes.
|
||||
//! If the actual usage is lower, the threshold is exceeded.
|
||||
//! If either of these thresholds is exceeded, the system is considered to have "disk pressure", and eviction
|
||||
//! is performed on the next iteration, to release disk space and bring the usage below the thresholds again.
|
||||
//! The iteration evicts layers in LRU fashion, but, with a weak reservation per tenant.
|
||||
//! The reservation is to keep the most recently accessed X bytes per tenant resident.
|
||||
//! If we cannot relieve pressure by evicting layers outside of the reservation, we
|
||||
//! start evicting layers that are part of the reservation, LRU first.
|
||||
//!
|
||||
//! The value for the per-tenant reservation is referred to as `tenant_min_resident_size`
|
||||
//! throughout the code, but, no actual variable carries that name.
|
||||
//! The per-tenant default value is the `max(tenant's layer file sizes, regardless of local or remote)`.
|
||||
//! The idea is to allow at least one layer to be resident per tenant, to ensure it can make forward progress
|
||||
//! during page reconstruction.
|
||||
//! An alternative default for all tenants can be specified in the `tenant_config` section of the config.
|
||||
//! Lastly, each tenant can have an override in their respective tenant config (`min_resident_size_override`).
|
||||
|
||||
// Implementation notes:
|
||||
// - The `#[allow(dead_code)]` above various structs are to suppress warnings about only the Debug impl
|
||||
// reading these fields. We use the Debug impl for semi-structured logging, though.
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::Path,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::serde_percent::Percent;
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{self, storage_layer::PersistentLayer, Timeline},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct DiskUsageEvictionTaskConfig {
|
||||
pub max_usage_pct: Percent,
|
||||
pub min_avail_bytes: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub period: Duration,
|
||||
#[cfg(feature = "testing")]
|
||||
pub mock_statvfs: Option<crate::statvfs::mock::Behavior>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct State {
|
||||
/// Exclude http requests and background task from running at the same time.
|
||||
mutex: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
pub fn launch_disk_usage_global_eviction_task(
|
||||
conf: &'static PageServerConf,
|
||||
storage: GenericRemoteStorage,
|
||||
state: Arc<State>,
|
||||
) -> anyhow::Result<()> {
|
||||
let Some(task_config) = &conf.disk_usage_based_eviction else {
|
||||
info!("disk usage based eviction task not configured");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
info!("launching disk usage based eviction task");
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::DiskUsageEviction,
|
||||
None,
|
||||
None,
|
||||
"disk usage based eviction",
|
||||
false,
|
||||
async move {
|
||||
disk_usage_eviction_task(
|
||||
&state,
|
||||
task_config,
|
||||
storage,
|
||||
&conf.tenants_path(),
|
||||
task_mgr::shutdown_token(),
|
||||
)
|
||||
.await;
|
||||
info!("disk usage based eviction task finishing");
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn disk_usage_eviction_task(
|
||||
state: &State,
|
||||
task_config: &DiskUsageEvictionTaskConfig,
|
||||
storage: GenericRemoteStorage,
|
||||
tenants_dir: &Path,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
use crate::tenant::tasks::random_init_delay;
|
||||
{
|
||||
if random_init_delay(task_config.period, &cancel)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
info!("shutting down");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let mut iteration_no = 0;
|
||||
loop {
|
||||
iteration_no += 1;
|
||||
let start = Instant::now();
|
||||
|
||||
async {
|
||||
let res = disk_usage_eviction_task_iteration(
|
||||
state,
|
||||
task_config,
|
||||
&storage,
|
||||
tenants_dir,
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
// these stat failures are expected to be very rare
|
||||
warn!("iteration failed, unexpected error: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(tracing::info_span!("iteration", iteration_no))
|
||||
.await;
|
||||
|
||||
let sleep_until = start + task_config.period;
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep_until(sleep_until) => {},
|
||||
_ = cancel.cancelled() => {
|
||||
info!("shutting down");
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Usage: Clone + Copy + std::fmt::Debug {
|
||||
fn has_pressure(&self) -> bool;
|
||||
fn add_available_bytes(&mut self, bytes: u64);
|
||||
}
|
||||
|
||||
async fn disk_usage_eviction_task_iteration(
|
||||
state: &State,
|
||||
task_config: &DiskUsageEvictionTaskConfig,
|
||||
storage: &GenericRemoteStorage,
|
||||
tenants_dir: &Path,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
|
||||
.context("get filesystem-level disk usage before evictions")?;
|
||||
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
|
||||
match res {
|
||||
Ok(outcome) => {
|
||||
debug!(?outcome, "disk_usage_eviction_iteration finished");
|
||||
match outcome {
|
||||
IterationOutcome::NoPressure | IterationOutcome::Cancelled => {
|
||||
// nothing to do, select statement below will handle things
|
||||
}
|
||||
IterationOutcome::Finished(outcome) => {
|
||||
// Verify with statvfs whether we made any real progress
|
||||
let after = filesystem_level_usage::get(tenants_dir, task_config)
|
||||
// It's quite unlikely to hit the error here. Keep the code simple and bail out.
|
||||
.context("get filesystem-level disk usage after evictions")?;
|
||||
|
||||
debug!(?after, "disk usage");
|
||||
|
||||
if after.has_pressure() {
|
||||
// Don't bother doing an out-of-order iteration here now.
|
||||
// In practice, the task period is set to a value in the tens-of-seconds range,
|
||||
// which will cause another iteration to happen soon enough.
|
||||
// TODO: deltas between the three different usages would be helpful,
|
||||
// consider MiB, GiB, TiB
|
||||
warn!(?outcome, ?after, "disk usage still high");
|
||||
} else {
|
||||
info!(?outcome, ?after, "disk usage pressure relieved");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("disk_usage_eviction_iteration failed: {:#}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum IterationOutcome<U> {
|
||||
NoPressure,
|
||||
Cancelled,
|
||||
Finished(IterationOutcomeFinished<U>),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct IterationOutcomeFinished<U> {
|
||||
/// The actual usage observed before we started the iteration.
|
||||
before: U,
|
||||
/// The expected value for `after`, according to internal accounting, after phase 1.
|
||||
planned: PlannedUsage<U>,
|
||||
/// The outcome of phase 2, where we actually do the evictions.
|
||||
///
|
||||
/// If all layers that phase 1 planned to evict _can_ actually get evicted, this will
|
||||
/// be the same as `planned`.
|
||||
assumed: AssumedUsage<U>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[allow(dead_code)]
|
||||
struct AssumedUsage<U> {
|
||||
/// The expected value for `after`, after phase 2.
|
||||
projected_after: U,
|
||||
/// The layers we failed to evict during phase 2.
|
||||
failed: LayerCount,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Serialize)]
|
||||
struct PlannedUsage<U> {
|
||||
respecting_tenant_min_resident_size: U,
|
||||
fallback_to_global_lru: Option<U>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default, Serialize)]
|
||||
struct LayerCount {
|
||||
file_sizes: u64,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
state: &State,
|
||||
storage: &GenericRemoteStorage,
|
||||
usage_pre: U,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<IterationOutcome<U>> {
|
||||
// use tokio's mutex to get a Sync guard (instead of std::sync::Mutex)
|
||||
let _g = state
|
||||
.mutex
|
||||
.try_lock()
|
||||
.map_err(|_| anyhow::anyhow!("iteration is already executing"))?;
|
||||
|
||||
debug!(?usage_pre, "disk usage");
|
||||
|
||||
if !usage_pre.has_pressure() {
|
||||
return Ok(IterationOutcome::NoPressure);
|
||||
}
|
||||
|
||||
warn!(
|
||||
?usage_pre,
|
||||
"running disk usage based eviction due to pressure"
|
||||
);
|
||||
|
||||
let candidates = match collect_eviction_candidates(cancel).await? {
|
||||
EvictionCandidates::Cancelled => {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
EvictionCandidates::Finished(partitioned) => partitioned,
|
||||
};
|
||||
|
||||
// Debug-log the list of candidates
|
||||
let now = SystemTime::now();
|
||||
for (i, (partition, candidate)) in candidates.iter().enumerate() {
|
||||
debug!(
|
||||
"cand {}/{}: size={}, no_access_for={}us, parition={:?}, tenant={} timeline={} layer={}",
|
||||
i + 1,
|
||||
candidates.len(),
|
||||
candidate.layer.file_size(),
|
||||
now.duration_since(candidate.last_activity_ts)
|
||||
.unwrap()
|
||||
.as_micros(),
|
||||
partition,
|
||||
candidate.layer.get_tenant_id(),
|
||||
candidate.layer.get_timeline_id(),
|
||||
candidate.layer.filename().file_name(),
|
||||
);
|
||||
}
|
||||
|
||||
// phase1: select victims to relieve pressure
|
||||
//
|
||||
// Walk through the list of candidates, until we have accumulated enough layers to get
|
||||
// us back under the pressure threshold. 'usage_planned' is updated so that it tracks
|
||||
// how much disk space would be used after evicting all the layers up to the current
|
||||
// point in the list. The layers are collected in 'batched', grouped per timeline.
|
||||
//
|
||||
// If we get far enough in the list that we start to evict layers that are below
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
|
||||
let mut warned = None;
|
||||
let mut usage_planned = usage_pre;
|
||||
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
||||
if !usage_planned.has_pressure() {
|
||||
debug!(
|
||||
no_candidates_evicted = i,
|
||||
"took enough candidates for pressure to be relieved"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
if partition == MinResidentSizePartition::Below && warned.is_none() {
|
||||
warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy");
|
||||
warned = Some(usage_planned);
|
||||
}
|
||||
|
||||
usage_planned.add_available_bytes(candidate.layer.file_size());
|
||||
|
||||
batched
|
||||
.entry(TimelineKey(candidate.timeline))
|
||||
.or_default()
|
||||
.push(candidate.layer);
|
||||
}
|
||||
|
||||
let usage_planned = match warned {
|
||||
Some(respecting_tenant_min_resident_size) => PlannedUsage {
|
||||
respecting_tenant_min_resident_size,
|
||||
fallback_to_global_lru: Some(usage_planned),
|
||||
},
|
||||
None => PlannedUsage {
|
||||
respecting_tenant_min_resident_size: usage_planned,
|
||||
fallback_to_global_lru: None,
|
||||
},
|
||||
};
|
||||
debug!(?usage_planned, "usage planned");
|
||||
|
||||
// phase2: evict victims batched by timeline
|
||||
|
||||
// After the loop, `usage_assumed` is the post-eviction usage,
|
||||
// according to internal accounting.
|
||||
let mut usage_assumed = usage_pre;
|
||||
let mut evictions_failed = LayerCount::default();
|
||||
for (timeline, batch) in batched {
|
||||
let tenant_id = timeline.tenant_id;
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let batch_size = batch.len();
|
||||
|
||||
debug!(%timeline_id, "evicting batch for timeline");
|
||||
|
||||
async {
|
||||
let results = timeline.evict_layers(storage, &batch, cancel.clone()).await;
|
||||
|
||||
match results {
|
||||
Err(e) => {
|
||||
warn!("failed to evict batch: {:#}", e);
|
||||
}
|
||||
Ok(results) => {
|
||||
assert_eq!(results.len(), batch.len());
|
||||
for (result, layer) in results.into_iter().zip(batch.iter()) {
|
||||
match result {
|
||||
Some(Ok(true)) => {
|
||||
usage_assumed.add_available_bytes(layer.file_size());
|
||||
}
|
||||
Some(Ok(false)) => {
|
||||
// this is:
|
||||
// - Replacement::{NotFound, Unexpected}
|
||||
// - it cannot be is_remote_layer, filtered already
|
||||
evictions_failed.file_sizes += layer.file_size();
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
None => {
|
||||
assert!(cancel.is_cancelled());
|
||||
return;
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// we really shouldn't be getting this, precondition failure
|
||||
error!("failed to evict layer: {:#}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size))
|
||||
.await;
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(IterationOutcome::Finished(IterationOutcomeFinished {
|
||||
before: usage_pre,
|
||||
planned: usage_planned,
|
||||
assumed: AssumedUsage {
|
||||
projected_after: usage_assumed,
|
||||
failed: evictions_failed,
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct EvictionCandidate {
|
||||
timeline: Arc<Timeline>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
enum MinResidentSizePartition {
|
||||
Above,
|
||||
Below,
|
||||
}
|
||||
|
||||
enum EvictionCandidates {
|
||||
Cancelled,
|
||||
Finished(Vec<(MinResidentSizePartition, EvictionCandidate)>),
|
||||
}
|
||||
|
||||
/// Gather the eviction candidates.
|
||||
///
|
||||
/// The returned `Ok(EvictionCandidates::Finished(candidates))` is sorted in eviction
|
||||
/// order. A caller that evicts in that order, until pressure is relieved, implements
|
||||
/// the eviction policy outlined in the module comment.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// Imagine that there are two tenants, A and B, with five layers each, a-e.
|
||||
/// Each layer has size 100, and both tenant's min_resident_size is 150.
|
||||
/// The eviction order would be
|
||||
///
|
||||
/// ```text
|
||||
/// partition last_activity_ts tenant/layer
|
||||
/// Above 18:30 A/c
|
||||
/// Above 19:00 A/b
|
||||
/// Above 18:29 B/c
|
||||
/// Above 19:05 B/b
|
||||
/// Above 20:00 B/a
|
||||
/// Above 20:03 A/a
|
||||
/// Below 20:30 A/d
|
||||
/// Below 20:40 B/d
|
||||
/// Below 20:45 B/e
|
||||
/// Below 20:58 A/e
|
||||
/// ```
|
||||
///
|
||||
/// Now, if we need to evict 300 bytes to relieve pressure, we'd evict `A/c, A/b, B/c`.
|
||||
/// They are all in the `Above` partition, so, we respected each tenant's min_resident_size.
|
||||
///
|
||||
/// But, if we need to evict 900 bytes to relieve pressure, we'd evict
|
||||
/// `A/c, A/b, B/c, B/b, B/a, A/a, A/d, B/d, B/e`, reaching into the `Below` partition
|
||||
/// after exhauting the `Above` partition.
|
||||
/// So, we did not respect each tenant's min_resident_size.
|
||||
async fn collect_eviction_candidates(
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<EvictionCandidates> {
|
||||
// get a snapshot of the list of tenants
|
||||
let tenants = tenant::mgr::list_tenants()
|
||||
.await
|
||||
.context("get list of tenants")?;
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state) in &tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => {
|
||||
// this can happen if tenant has lifecycle transition after we fetched it
|
||||
debug!("failed to get tenant: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// collect layers from all timelines in this tenant
|
||||
//
|
||||
// If one of the timelines becomes `!is_active()` during the iteration,
|
||||
// for example because we're shutting down, then `max_layer_size` can be too small.
|
||||
// That's OK. This code only runs under a disk pressure situation, and being
|
||||
// a little unfair to tenants during shutdown in such a situation is tolerable.
|
||||
let mut tenant_candidates = Vec::new();
|
||||
let mut max_layer_size = 0;
|
||||
for tl in tenant.list_timelines() {
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
.into_iter()
|
||||
.map(|layer_infos| (tl.clone(), layer_infos)),
|
||||
);
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
// `min_resident_size` defaults to maximum layer file size of the tenant.
|
||||
// This ensures that each tenant can have at least one layer resident at a given time,
|
||||
// ensuring forward progress for a single Timeline::get in that tenant.
|
||||
// It's a questionable heuristic since, usually, there are many Timeline::get
|
||||
// requests going on for a tenant, and, at least in Neon prod, the median
|
||||
// layer file size is much smaller than the compaction target size.
|
||||
// We could be better here, e.g., sum of all L0 layers + most recent L1 layer.
|
||||
// That's what's typically used by the various background loops.
|
||||
//
|
||||
// The default can be overriden with a fixed value in the tenant conf.
|
||||
// A default override can be put in the default tenant conf in the pageserver.toml.
|
||||
let min_resident_size = if let Some(s) = tenant.get_min_resident_size_override() {
|
||||
debug!(
|
||||
tenant_id=%tenant.tenant_id(),
|
||||
overriden_size=s,
|
||||
"using overridden min resident size for tenant"
|
||||
);
|
||||
s
|
||||
} else {
|
||||
debug!(
|
||||
tenant_id=%tenant.tenant_id(),
|
||||
max_layer_size,
|
||||
"using max layer size as min_resident_size for tenant",
|
||||
);
|
||||
max_layer_size
|
||||
};
|
||||
|
||||
// Sort layers most-recently-used first, then partition by
|
||||
// cumsum above/below min_resident_size.
|
||||
tenant_candidates
|
||||
.sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
let mut cumsum: i128 = 0;
|
||||
for (timeline, layer_info) in tenant_candidates.into_iter() {
|
||||
let file_size = layer_info.file_size();
|
||||
let candidate = EvictionCandidate {
|
||||
timeline,
|
||||
last_activity_ts: layer_info.last_activity_ts,
|
||||
layer: layer_info.layer,
|
||||
};
|
||||
let partition = if cumsum > min_resident_size as i128 {
|
||||
MinResidentSizePartition::Above
|
||||
} else {
|
||||
MinResidentSizePartition::Below
|
||||
};
|
||||
candidates.push((partition, candidate));
|
||||
cumsum += i128::from(file_size);
|
||||
}
|
||||
}
|
||||
|
||||
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
|
||||
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
|
||||
candidates
|
||||
.sort_unstable_by_key(|(partition, candidate)| (*partition, candidate.last_activity_ts));
|
||||
|
||||
Ok(EvictionCandidates::Finished(candidates))
|
||||
}
|
||||
|
||||
struct TimelineKey(Arc<Timeline>);
|
||||
|
||||
impl PartialEq for TimelineKey {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
Arc::ptr_eq(&self.0, &other.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for TimelineKey {}
|
||||
|
||||
impl std::hash::Hash for TimelineKey {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
Arc::as_ptr(&self.0).hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TimelineKey {
|
||||
type Target = Timeline;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
mod filesystem_level_usage {
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
use crate::statvfs::Statvfs;
|
||||
|
||||
use super::DiskUsageEvictionTaskConfig;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[allow(dead_code)]
|
||||
pub struct Usage<'a> {
|
||||
config: &'a DiskUsageEvictionTaskConfig,
|
||||
|
||||
/// Filesystem capacity
|
||||
total_bytes: u64,
|
||||
/// Free filesystem space
|
||||
avail_bytes: u64,
|
||||
}
|
||||
|
||||
impl super::Usage for Usage<'_> {
|
||||
fn has_pressure(&self) -> bool {
|
||||
let usage_pct =
|
||||
(100.0 * (1.0 - ((self.avail_bytes as f64) / (self.total_bytes as f64)))) as u64;
|
||||
|
||||
let pressures = [
|
||||
(
|
||||
"min_avail_bytes",
|
||||
self.avail_bytes < self.config.min_avail_bytes,
|
||||
),
|
||||
(
|
||||
"max_usage_pct",
|
||||
usage_pct > self.config.max_usage_pct.get() as u64,
|
||||
),
|
||||
];
|
||||
|
||||
pressures.into_iter().any(|(_, has_pressure)| has_pressure)
|
||||
}
|
||||
|
||||
fn add_available_bytes(&mut self, bytes: u64) {
|
||||
self.avail_bytes += bytes;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get<'a>(
|
||||
tenants_dir: &Path,
|
||||
config: &'a DiskUsageEvictionTaskConfig,
|
||||
) -> anyhow::Result<Usage<'a>> {
|
||||
let mock_config = {
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
config.mock_statvfs.as_ref()
|
||||
}
|
||||
#[cfg(not(feature = "testing"))]
|
||||
{
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let stat = Statvfs::get(tenants_dir, mock_config)
|
||||
.context("statvfs failed, presumably directory got unlinked")?;
|
||||
|
||||
// https://unix.stackexchange.com/a/703650
|
||||
let blocksize = if stat.fragment_size() > 0 {
|
||||
stat.fragment_size()
|
||||
} else {
|
||||
stat.block_size()
|
||||
};
|
||||
|
||||
// use blocks_available (b_avail) since, pageserver runs as unprivileged user
|
||||
let avail_bytes = stat.blocks_available() * blocksize;
|
||||
let total_bytes = stat.blocks() * blocksize;
|
||||
|
||||
Ok(Usage {
|
||||
config,
|
||||
total_bytes,
|
||||
avail_bytes,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,31 @@ paths:
|
||||
id:
|
||||
type: integer
|
||||
|
||||
/v1/disk_usage_eviction/run:
|
||||
put:
|
||||
description: Do an iteration of disk-usage-based eviction to evict a given amount of disk space.
|
||||
security: []
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- evict_bytes
|
||||
properties:
|
||||
evict_bytes:
|
||||
type: integer
|
||||
responses:
|
||||
"200":
|
||||
description: |
|
||||
The run completed.
|
||||
This does not necessarily mean that we actually evicted `evict_bytes`.
|
||||
Examine the returned object for detail, or, just watch the actual effect of the call using `du` or `df`.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
|
||||
/v1/tenant/{tenant_id}:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
|
||||
@@ -18,6 +18,7 @@ use super::models::{
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::disk_usage_eviction_task;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
@@ -48,6 +49,7 @@ struct State {
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
allowlist_routes: Vec<Uri>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
@@ -55,6 +57,7 @@ impl State {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
|
||||
.iter()
|
||||
@@ -65,6 +68,7 @@ impl State {
|
||||
auth,
|
||||
allowlist_routes,
|
||||
remote_storage,
|
||||
disk_usage_eviction_state,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -775,6 +779,8 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
);
|
||||
}
|
||||
|
||||
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
|
||||
|
||||
let target_tenant_id = request_data
|
||||
.new_tenant_id
|
||||
.map(TenantId::from)
|
||||
@@ -906,6 +912,8 @@ async fn update_tenant_config_handler(
|
||||
);
|
||||
}
|
||||
|
||||
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
|
||||
|
||||
let state = get_state(&request);
|
||||
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
|
||||
@@ -914,6 +922,20 @@ async fn update_tenant_config_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
|
||||
#[cfg(feature = "testing")]
|
||||
async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;
|
||||
|
||||
let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
|
||||
|
||||
tenant.set_broken("broken from test");
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
async fn failpoints_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
if !fail::has_failpoints() {
|
||||
@@ -1063,6 +1085,89 @@ async fn always_panic_handler(req: Request<Body>) -> Result<Response<Body>, ApiE
|
||||
json_response(StatusCode::NO_CONTENT, ())
|
||||
}
|
||||
|
||||
async fn disk_usage_eviction_run(mut r: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&r, None)?;
|
||||
|
||||
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
|
||||
struct Config {
|
||||
/// How many bytes to evict before reporting that pressure is relieved.
|
||||
evict_bytes: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, serde::Serialize)]
|
||||
struct Usage {
|
||||
// remains unchanged after instantiation of the struct
|
||||
config: Config,
|
||||
// updated by `add_available_bytes`
|
||||
freed_bytes: u64,
|
||||
}
|
||||
|
||||
impl crate::disk_usage_eviction_task::Usage for Usage {
|
||||
fn has_pressure(&self) -> bool {
|
||||
self.config.evict_bytes > self.freed_bytes
|
||||
}
|
||||
|
||||
fn add_available_bytes(&mut self, bytes: u64) {
|
||||
self.freed_bytes += bytes;
|
||||
}
|
||||
}
|
||||
|
||||
let config = json_request::<Config>(&mut r)
|
||||
.await
|
||||
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
|
||||
|
||||
let usage = Usage {
|
||||
config,
|
||||
freed_bytes: 0,
|
||||
};
|
||||
|
||||
use crate::task_mgr::MGMT_REQUEST_RUNTIME;
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
let state = get_state(&r);
|
||||
|
||||
let Some(storage) = state.remote_storage.clone() else {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"remote storage not configured, cannot run eviction iteration"
|
||||
)))
|
||||
};
|
||||
|
||||
let state = state.disk_usage_eviction_state.clone();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let child_cancel = cancel.clone();
|
||||
let _g = cancel.drop_guard();
|
||||
|
||||
crate::task_mgr::spawn(
|
||||
MGMT_REQUEST_RUNTIME.handle(),
|
||||
TaskKind::DiskUsageEviction,
|
||||
None,
|
||||
None,
|
||||
"ondemand disk usage eviction",
|
||||
false,
|
||||
async move {
|
||||
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
|
||||
&state,
|
||||
&storage,
|
||||
usage,
|
||||
&child_cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
info!(?res, "disk_usage_eviction_task_iteration_impl finished");
|
||||
|
||||
let _ = tx.send(res);
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(
|
||||
StatusCode::NOT_FOUND,
|
||||
@@ -1075,6 +1180,7 @@ pub fn make_router(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
|
||||
let spec = include_bytes!("openapi_spec.yml");
|
||||
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
|
||||
@@ -1119,7 +1225,8 @@ pub fn make_router(
|
||||
|
||||
Ok(router
|
||||
.data(Arc::new(
|
||||
State::new(conf, auth, remote_storage).context("Failed to initialize router state")?,
|
||||
State::new(conf, auth, remote_storage, disk_usage_eviction_state)
|
||||
.context("Failed to initialize router state")?,
|
||||
))
|
||||
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
|
||||
.put(
|
||||
@@ -1200,6 +1307,13 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
|
||||
|r| RequestSpan(evict_timeline_layer_handler).handle(r),
|
||||
)
|
||||
.put("/v1/disk_usage_eviction/run", |r| {
|
||||
RequestSpan(disk_usage_eviction_run).handle(r)
|
||||
})
|
||||
.put(
|
||||
"/v1/tenant/:tenant_id/break",
|
||||
testing_api!("set tenant state to broken", handle_tenant_break),
|
||||
)
|
||||
.get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r))
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod broker_client;
|
||||
pub mod config;
|
||||
pub mod consumption_metrics;
|
||||
pub mod context;
|
||||
pub mod disk_usage_eviction_task;
|
||||
pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub mod keyspace;
|
||||
@@ -12,6 +13,7 @@ pub mod page_cache;
|
||||
pub mod page_service;
|
||||
pub mod pgdatadir_mapping;
|
||||
pub mod repository;
|
||||
pub(crate) mod statvfs;
|
||||
pub mod task_mgr;
|
||||
pub mod tenant;
|
||||
pub mod trace;
|
||||
|
||||
@@ -257,7 +257,7 @@ impl EvictionsWithLowResidenceDuration {
|
||||
}
|
||||
|
||||
pub fn observe(&self, observed_value: Duration) {
|
||||
if self.threshold < observed_value {
|
||||
if observed_value < self.threshold {
|
||||
self.counter
|
||||
.as_ref()
|
||||
.expect("nobody calls this function after `remove_from_vec`")
|
||||
|
||||
150
pageserver/src/statvfs.rs
Normal file
150
pageserver/src/statvfs.rs
Normal file
@@ -0,0 +1,150 @@
|
||||
//! Wrapper around nix::sys::statvfs::Statvfs that allows for mocking.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
pub enum Statvfs {
|
||||
Real(nix::sys::statvfs::Statvfs),
|
||||
Mock(mock::Statvfs),
|
||||
}
|
||||
|
||||
// NB: on macOS, the block count type of struct statvfs is u32.
|
||||
// The workaround seems to be to use the non-standard statfs64 call.
|
||||
// Sincce it should only be a problem on > 2TiB disks, let's ignore
|
||||
// the problem for now and upcast to u64.
|
||||
impl Statvfs {
|
||||
pub fn get(tenants_dir: &Path, mocked: Option<&mock::Behavior>) -> nix::Result<Self> {
|
||||
if let Some(mocked) = mocked {
|
||||
Ok(Statvfs::Mock(mock::get(tenants_dir, mocked)?))
|
||||
} else {
|
||||
Ok(Statvfs::Real(nix::sys::statvfs::statvfs(tenants_dir)?))
|
||||
}
|
||||
}
|
||||
|
||||
// NB: allow() because the block count type is u32 on macOS.
|
||||
#[allow(clippy::useless_conversion)]
|
||||
pub fn blocks(&self) -> u64 {
|
||||
match self {
|
||||
Statvfs::Real(stat) => u64::try_from(stat.blocks()).unwrap(),
|
||||
Statvfs::Mock(stat) => stat.blocks,
|
||||
}
|
||||
}
|
||||
|
||||
// NB: allow() because the block count type is u32 on macOS.
|
||||
#[allow(clippy::useless_conversion)]
|
||||
pub fn blocks_available(&self) -> u64 {
|
||||
match self {
|
||||
Statvfs::Real(stat) => u64::try_from(stat.blocks_available()).unwrap(),
|
||||
Statvfs::Mock(stat) => stat.blocks_available,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fragment_size(&self) -> u64 {
|
||||
match self {
|
||||
Statvfs::Real(stat) => stat.fragment_size(),
|
||||
Statvfs::Mock(stat) => stat.fragment_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn block_size(&self) -> u64 {
|
||||
match self {
|
||||
Statvfs::Real(stat) => stat.block_size(),
|
||||
Statvfs::Mock(stat) => stat.block_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod mock {
|
||||
use anyhow::Context;
|
||||
use regex::Regex;
|
||||
use std::path::Path;
|
||||
use tracing::log::info;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum Behavior {
|
||||
Success {
|
||||
blocksize: u64,
|
||||
total_blocks: u64,
|
||||
name_filter: Option<utils::serde_regex::Regex>,
|
||||
},
|
||||
Failure {
|
||||
mocked_error: MockedError,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
pub enum MockedError {
|
||||
EIO,
|
||||
}
|
||||
|
||||
impl From<MockedError> for nix::Error {
|
||||
fn from(e: MockedError) -> Self {
|
||||
match e {
|
||||
MockedError::EIO => nix::Error::EIO,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(tenants_dir: &Path, behavior: &Behavior) -> nix::Result<Statvfs> {
|
||||
info!("running mocked statvfs");
|
||||
|
||||
match behavior {
|
||||
Behavior::Success {
|
||||
blocksize,
|
||||
total_blocks,
|
||||
ref name_filter,
|
||||
} => {
|
||||
let used_bytes = walk_dir_disk_usage(tenants_dir, name_filter.as_deref()).unwrap();
|
||||
|
||||
// round it up to the nearest block multiple
|
||||
let used_blocks = (used_bytes + (blocksize - 1)) / blocksize;
|
||||
|
||||
if used_blocks > *total_blocks {
|
||||
panic!(
|
||||
"mocking error: used_blocks > total_blocks: {used_blocks} > {total_blocks}"
|
||||
);
|
||||
}
|
||||
|
||||
let avail_blocks = total_blocks - used_blocks;
|
||||
|
||||
Ok(Statvfs {
|
||||
blocks: *total_blocks,
|
||||
blocks_available: avail_blocks,
|
||||
fragment_size: *blocksize,
|
||||
block_size: *blocksize,
|
||||
})
|
||||
}
|
||||
Behavior::Failure { mocked_error } => Err((*mocked_error).into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn walk_dir_disk_usage(path: &Path, name_filter: Option<&Regex>) -> anyhow::Result<u64> {
|
||||
let mut total = 0;
|
||||
for entry in walkdir::WalkDir::new(path) {
|
||||
let entry = entry?;
|
||||
if !entry.file_type().is_file() {
|
||||
continue;
|
||||
}
|
||||
if !name_filter
|
||||
.as_ref()
|
||||
.map(|filter| filter.is_match(entry.file_name().to_str().unwrap()))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
total += entry
|
||||
.metadata()
|
||||
.with_context(|| format!("get metadata of {:?}", entry.path()))?
|
||||
.len();
|
||||
}
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
pub struct Statvfs {
|
||||
pub blocks: u64,
|
||||
pub blocks_available: u64,
|
||||
pub fragment_size: u64,
|
||||
pub block_size: u64,
|
||||
}
|
||||
}
|
||||
@@ -234,6 +234,9 @@ pub enum TaskKind {
|
||||
// Eviction. One per timeline.
|
||||
Eviction,
|
||||
|
||||
/// See [`crate::disk_usage_eviction_task`].
|
||||
DiskUsageEviction,
|
||||
|
||||
// Initial logical size calculation
|
||||
InitialLogicalSizeCalculation,
|
||||
|
||||
|
||||
@@ -95,7 +95,7 @@ mod timeline;
|
||||
|
||||
pub mod size;
|
||||
|
||||
pub use timeline::{PageReconstructError, Timeline};
|
||||
pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline};
|
||||
|
||||
// re-export this function so that page_cache.rs can use it.
|
||||
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
|
||||
@@ -1706,6 +1706,13 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.min_resident_size_override
|
||||
.or(self.conf.default_tenant_conf.min_resident_size_override)
|
||||
}
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
*self.tenant_conf.write().unwrap() = new_tenant_conf;
|
||||
}
|
||||
@@ -2783,6 +2790,7 @@ pub mod harness {
|
||||
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||
trace_read_requests: Some(tenant_conf.trace_read_requests),
|
||||
eviction_policy: Some(tenant_conf.eviction_policy),
|
||||
min_resident_size_override: tenant_conf.min_resident_size_override,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,7 @@ pub struct TenantConf {
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub trace_read_requests: bool,
|
||||
pub eviction_policy: EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -159,6 +160,10 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -220,6 +225,9 @@ impl TenantConfOpt {
|
||||
.trace_read_requests
|
||||
.unwrap_or(global_conf.trace_read_requests),
|
||||
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
|
||||
min_resident_size_override: self
|
||||
.min_resident_size_override
|
||||
.or(global_conf.min_resident_size_override),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -251,6 +259,7 @@ impl Default for TenantConf {
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
trace_read_requests: false,
|
||||
eviction_policy: EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,10 +121,10 @@ struct LayerAccessStatsInner {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(super) struct LayerAccessStatFullDetails {
|
||||
pub(super) when: SystemTime,
|
||||
pub(super) task_kind: TaskKind,
|
||||
pub(super) access_kind: LayerAccessKind,
|
||||
pub(crate) struct LayerAccessStatFullDetails {
|
||||
pub(crate) when: SystemTime,
|
||||
pub(crate) task_kind: TaskKind,
|
||||
pub(crate) access_kind: LayerAccessKind,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, strum_macros::EnumString)]
|
||||
@@ -255,7 +255,7 @@ impl LayerAccessStats {
|
||||
ret
|
||||
}
|
||||
|
||||
pub(super) fn most_recent_access_or_residence_event(
|
||||
fn most_recent_access_or_residence_event(
|
||||
&self,
|
||||
) -> Either<LayerAccessStatFullDetails, LayerResidenceEvent> {
|
||||
let locked = self.0.lock().unwrap();
|
||||
@@ -268,6 +268,13 @@ impl LayerAccessStats {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn latest_activity(&self) -> SystemTime {
|
||||
match self.most_recent_access_or_residence_event() {
|
||||
Either::Left(mra) => mra.when,
|
||||
Either::Right(re) => re.timestamp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
|
||||
|
||||
@@ -244,14 +244,12 @@ pub(crate) async fn random_init_delay(
|
||||
) -> Result<(), Cancelled> {
|
||||
use rand::Rng;
|
||||
|
||||
if period == Duration::ZERO {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let d = {
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
// gen_range asserts that the range cannot be empty, which it could be because period can
|
||||
// be set to zero to disable gc or compaction, so lets set it to be at least 10s.
|
||||
let period = std::cmp::max(period, Duration::from_secs(10));
|
||||
|
||||
// semi-ok default as the source of jitter
|
||||
rng.gen_range(Duration::ZERO..=period)
|
||||
};
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -957,6 +958,25 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Evict a batch of layers.
|
||||
///
|
||||
/// GenericRemoteStorage reference is required as a witness[^witness_article] for "remote storage is configured."
|
||||
///
|
||||
/// [^witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html
|
||||
pub async fn evict_layers(
|
||||
&self,
|
||||
_: &GenericRemoteStorage,
|
||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||
let remote_client = self.remote_client.clone().expect(
|
||||
"GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient",
|
||||
);
|
||||
|
||||
self.evict_layer_batch(&remote_client, layers_to_evict, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Evict multiple layers at once, continuing through errors.
|
||||
///
|
||||
/// Try to evict the given `layers_to_evict` by
|
||||
@@ -994,6 +1014,15 @@ impl Timeline {
|
||||
// now lock out layer removal (compaction, gc, timeline deletion)
|
||||
let layer_removal_guard = self.layer_removal_cs.lock().await;
|
||||
|
||||
{
|
||||
// to avoid racing with detach and delete_timeline
|
||||
let state = self.current_state();
|
||||
anyhow::ensure!(
|
||||
state == TimelineState::Active,
|
||||
"timeline is not active but {state:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// start the batch update
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut batch_updates = layer_map.batch_update();
|
||||
@@ -1027,6 +1056,8 @@ impl Timeline {
|
||||
use super::layer_map::Replacement;
|
||||
|
||||
if local_layer.is_remote_layer() {
|
||||
// TODO(issue #3851): consider returning an err here instead of false,
|
||||
// which is the same out the match later
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
@@ -4012,6 +4043,67 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DiskUsageEvictionInfo {
|
||||
/// Timeline's largest layer (remote or resident)
|
||||
pub max_layer_size: Option<u64>,
|
||||
/// Timeline's resident layers
|
||||
pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
|
||||
}
|
||||
|
||||
pub struct LocalLayerInfoForDiskUsageEviction {
|
||||
pub layer: Arc<dyn PersistentLayer>,
|
||||
pub last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
|
||||
// having to allocate a string to this is bad, but it will rarely be formatted
|
||||
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
|
||||
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
|
||||
f.debug_struct("LocalLayerInfoForDiskUsageEviction")
|
||||
.field("layer", &self.layer)
|
||||
.field("last_activity", &ts)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalLayerInfoForDiskUsageEviction {
|
||||
pub fn file_size(&self) -> u64 {
|
||||
self.layer.file_size()
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
let mut max_layer_size: Option<u64> = None;
|
||||
let mut resident_layers = Vec::new();
|
||||
|
||||
for l in layers.iter_historic_layers() {
|
||||
let file_size = l.file_size();
|
||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||
|
||||
if l.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let last_activity_ts = l.access_stats().latest_activity();
|
||||
|
||||
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
|
||||
layer: l,
|
||||
last_activity_ts,
|
||||
});
|
||||
}
|
||||
|
||||
DiskUsageEvictionInfo {
|
||||
max_layer_size,
|
||||
resident_layers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (
|
||||
ValueReconstructResult,
|
||||
Lsn,
|
||||
|
||||
@@ -20,7 +20,6 @@ use std::{
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use either::Either;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
@@ -185,13 +184,7 @@ impl Timeline {
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
let last_activity_ts = match hist_layer
|
||||
.access_stats()
|
||||
.most_recent_access_or_residence_event()
|
||||
{
|
||||
Either::Left(mra) => mra.when,
|
||||
Either::Right(re) => re.timestamp,
|
||||
};
|
||||
let last_activity_ts = hist_layer.access_stats().latest_activity();
|
||||
let no_activity_for = match now.duration_since(last_activity_ts) {
|
||||
Ok(d) => d,
|
||||
Err(_e) => {
|
||||
|
||||
@@ -237,11 +237,7 @@ async fn connection_manager_loop_step(
|
||||
if let Some(new_candidate) = walreceiver_state.next_connection_candidate() {
|
||||
info!("Switching to new connection candidate: {new_candidate:?}");
|
||||
walreceiver_state
|
||||
.change_connection(
|
||||
new_candidate.safekeeper_id,
|
||||
new_candidate.wal_source_connconf,
|
||||
ctx,
|
||||
)
|
||||
.change_connection(new_candidate, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -346,6 +342,8 @@ struct WalConnection {
|
||||
started_at: NaiveDateTime,
|
||||
/// Current safekeeper pageserver is connected to for WAL streaming.
|
||||
sk_id: NodeId,
|
||||
/// Availability zone of the safekeeper.
|
||||
availability_zone: Option<String>,
|
||||
/// Status of the connection.
|
||||
status: WalConnectionStatus,
|
||||
/// WAL streaming task handle.
|
||||
@@ -405,12 +403,7 @@ impl WalreceiverState {
|
||||
}
|
||||
|
||||
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
|
||||
async fn change_connection(
|
||||
&mut self,
|
||||
new_sk_id: NodeId,
|
||||
new_wal_source_connconf: PgConnectionConfig,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
|
||||
self.drop_old_connection(true).await;
|
||||
|
||||
let id = self.id;
|
||||
@@ -424,7 +417,7 @@ impl WalreceiverState {
|
||||
async move {
|
||||
super::walreceiver_connection::handle_walreceiver_connection(
|
||||
timeline,
|
||||
new_wal_source_connconf,
|
||||
new_sk.wal_source_connconf,
|
||||
events_sender,
|
||||
cancellation,
|
||||
connect_timeout,
|
||||
@@ -433,13 +426,16 @@ impl WalreceiverState {
|
||||
.await
|
||||
.context("walreceiver connection handling failure")
|
||||
}
|
||||
.instrument(info_span!("walreceiver_connection", id = %id, node_id = %new_sk_id))
|
||||
.instrument(
|
||||
info_span!("walreceiver_connection", id = %id, node_id = %new_sk.safekeeper_id),
|
||||
)
|
||||
});
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
self.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: new_sk_id,
|
||||
sk_id: new_sk.safekeeper_id,
|
||||
availability_zone: new_sk.availability_zone,
|
||||
status: WalConnectionStatus {
|
||||
is_connected: false,
|
||||
has_processed_wal: false,
|
||||
@@ -546,6 +542,7 @@ impl WalreceiverState {
|
||||
/// * if connected safekeeper is not present, pick the candidate
|
||||
/// * if we haven't received any updates for some time, pick the candidate
|
||||
/// * if the candidate commit_lsn is much higher than the current one, pick the candidate
|
||||
/// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
|
||||
/// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
|
||||
///
|
||||
/// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
|
||||
@@ -559,6 +556,7 @@ impl WalreceiverState {
|
||||
|
||||
let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
|
||||
self.select_connection_candidate(Some(connected_sk_node))?;
|
||||
let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
if let Ok(latest_interaciton) =
|
||||
@@ -569,6 +567,7 @@ impl WalreceiverState {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
availability_zone: new_availability_zone,
|
||||
reason: ReconnectReason::NoKeepAlives {
|
||||
last_keep_alive: Some(
|
||||
existing_wal_connection.status.latest_connection_update,
|
||||
@@ -594,6 +593,7 @@ impl WalreceiverState {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
availability_zone: new_availability_zone,
|
||||
reason: ReconnectReason::LaggingWal {
|
||||
current_commit_lsn,
|
||||
new_commit_lsn,
|
||||
@@ -601,6 +601,20 @@ impl WalreceiverState {
|
||||
},
|
||||
});
|
||||
}
|
||||
// If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
|
||||
// and the current one is not, switch to the new one.
|
||||
if self.availability_zone.is_some()
|
||||
&& existing_wal_connection.availability_zone
|
||||
!= self.availability_zone
|
||||
&& self.availability_zone == new_availability_zone
|
||||
{
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
availability_zone: new_availability_zone,
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
reason: ReconnectReason::SwitchAvailabilityZone,
|
||||
});
|
||||
}
|
||||
}
|
||||
None => debug!(
|
||||
"Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
|
||||
@@ -668,6 +682,7 @@ impl WalreceiverState {
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
availability_zone: new_availability_zone,
|
||||
reason: ReconnectReason::NoWalTimeout {
|
||||
current_lsn,
|
||||
current_commit_lsn,
|
||||
@@ -686,10 +701,11 @@ impl WalreceiverState {
|
||||
self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
|
||||
}
|
||||
None => {
|
||||
let (new_sk_id, _, new_wal_source_connconf) =
|
||||
let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
|
||||
self.select_connection_candidate(None)?;
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
|
||||
wal_source_connconf: new_wal_source_connconf,
|
||||
reason: ReconnectReason::NoExistingConnection,
|
||||
});
|
||||
@@ -794,6 +810,7 @@ impl WalreceiverState {
|
||||
struct NewWalConnectionCandidate {
|
||||
safekeeper_id: NodeId,
|
||||
wal_source_connconf: PgConnectionConfig,
|
||||
availability_zone: Option<String>,
|
||||
// This field is used in `derive(Debug)` only.
|
||||
#[allow(dead_code)]
|
||||
reason: ReconnectReason,
|
||||
@@ -808,6 +825,7 @@ enum ReconnectReason {
|
||||
new_commit_lsn: Lsn,
|
||||
threshold: NonZeroU64,
|
||||
},
|
||||
SwitchAvailabilityZone,
|
||||
NoWalTimeout {
|
||||
current_lsn: Lsn,
|
||||
current_commit_lsn: Lsn,
|
||||
@@ -873,6 +891,7 @@ mod tests {
|
||||
peer_horizon_lsn: 0,
|
||||
local_start_lsn: 0,
|
||||
safekeeper_connstr: safekeeper_connstr.to_owned(),
|
||||
availability_zone: None,
|
||||
},
|
||||
latest_update,
|
||||
}
|
||||
@@ -933,6 +952,7 @@ mod tests {
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
@@ -1095,6 +1115,7 @@ mod tests {
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
@@ -1160,6 +1181,7 @@ mod tests {
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: NodeId(1),
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
@@ -1222,6 +1244,7 @@ mod tests {
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: NodeId(1),
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
|
||||
discovered_new_wal: Some(NewCommittedWAL {
|
||||
@@ -1289,4 +1312,74 @@ mod tests {
|
||||
availability_zone: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
|
||||
// Pageserver and one of safekeepers will be in the same availability zone
|
||||
// and pageserver should prefer to connect to it.
|
||||
let test_az = Some("test_az".to_owned());
|
||||
|
||||
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
|
||||
let mut state = dummy_state(&harness).await;
|
||||
state.availability_zone = test_az.clone();
|
||||
let current_lsn = Lsn(100_000).align();
|
||||
let now = Utc::now().naive_utc();
|
||||
|
||||
let connected_sk_id = NodeId(0);
|
||||
|
||||
let connection_status = WalConnectionStatus {
|
||||
is_connected: true,
|
||||
has_processed_wal: true,
|
||||
latest_connection_update: now,
|
||||
latest_wal_update: now,
|
||||
commit_lsn: Some(current_lsn),
|
||||
streaming_lsn: Some(current_lsn),
|
||||
};
|
||||
|
||||
state.wal_connection = Some(WalConnection {
|
||||
started_at: now,
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskStateUpdate::Progress(connection_status))
|
||||
.ok();
|
||||
Ok(())
|
||||
}),
|
||||
discovered_new_wal: None,
|
||||
});
|
||||
|
||||
// We have another safekeeper with the same commit_lsn, and it have the same availability zone as
|
||||
// the current pageserver.
|
||||
let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
|
||||
same_az_sk.timeline.availability_zone = test_az.clone();
|
||||
|
||||
state.wal_stream_candidates = HashMap::from([
|
||||
(
|
||||
connected_sk_id,
|
||||
dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
|
||||
),
|
||||
(NodeId(1), same_az_sk),
|
||||
]);
|
||||
|
||||
// We expect that pageserver will switch to the safekeeper in the same availability zone,
|
||||
// even if it has the same commit_lsn.
|
||||
let next_candidate = state.next_connection_candidate().expect(
|
||||
"Expected one candidate selected out of multiple valid data options, but got none",
|
||||
);
|
||||
|
||||
assert_eq!(next_candidate.safekeeper_id, NodeId(1));
|
||||
assert_eq!(
|
||||
next_candidate.reason,
|
||||
ReconnectReason::SwitchAvailabilityZone,
|
||||
"Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
|
||||
);
|
||||
assert_eq!(
|
||||
next_candidate.wal_source_connconf.host(),
|
||||
&Host::Domain("same_az".to_owned())
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,8 +24,6 @@
|
||||
#include "pgstat.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "access/parallel.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "lib/hyperloglog.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "storage/relfilenode.h"
|
||||
#include "storage/buf_internals.h"
|
||||
@@ -67,7 +65,6 @@
|
||||
|
||||
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
|
||||
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
|
||||
#define HYPER_LOG_LOG_BIT_WIDTH 10
|
||||
|
||||
typedef struct FileCacheEntry
|
||||
{
|
||||
@@ -80,11 +77,9 @@ typedef struct FileCacheEntry
|
||||
|
||||
typedef struct FileCacheControl
|
||||
{
|
||||
uint32 size; /* size of cache file in chunks */
|
||||
uint32 used; /* number of used chunks */
|
||||
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
||||
hyperLogLogState wss_estimation; /* estimation of wroking set size */
|
||||
char hyperloglog_hashes[(1 << HYPER_LOG_LOG_BIT_WIDTH) + 1];
|
||||
uint32 size; /* size of cache file in chunks */
|
||||
uint32 used; /* number of used chunks */
|
||||
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
||||
} FileCacheControl;
|
||||
|
||||
static HTAB* lfc_hash;
|
||||
@@ -129,12 +124,6 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
initHyperLogLog(&lfc_ctl->wss_estimation, HYPER_LOG_LOG_BIT_WIDTH);
|
||||
|
||||
/* We need hashes in shared memory */
|
||||
pfree(lfc_ctl->wss_estimation.hashesArr);
|
||||
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
|
||||
lfc_ctl->wss_estimation.hashesArr = lfc_ctl->hyperloglog_hashes;
|
||||
|
||||
/* Remove file cache on restart */
|
||||
(void)unlink(lfc_path);
|
||||
@@ -407,11 +396,6 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
|
||||
/* Approximate working set */
|
||||
tag.blockNum = blkno;
|
||||
addHyperLogLog(&lfc_ctl->wss_estimation, hash_bytes((char const*)&tag, sizeof(tag)));
|
||||
|
||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||
{
|
||||
/* Page is not cached */
|
||||
@@ -714,21 +698,3 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
else
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(approximate_working_set_size);
|
||||
|
||||
Datum
|
||||
approximate_working_set_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 dc = -1;
|
||||
if (lfc_size_limit != 0)
|
||||
{
|
||||
bool reset = PG_GETARG_BOOL(0);
|
||||
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
|
||||
dc = (int32) estimateHyperLogLog(&lfc_ctl->wss_estimation);
|
||||
if (reset)
|
||||
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
PG_RETURN_INT32(dc);
|
||||
}
|
||||
|
||||
@@ -27,13 +27,6 @@ RETURNS SETOF RECORD
|
||||
AS 'MODULE_PATHNAME', 'local_cache_pages'
|
||||
LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
CREATE FUNCTION approximate_working_set_size(reset bool)
|
||||
RETURNS integer
|
||||
AS 'MODULE_PATHNAME', 'approximate_working_set_size'
|
||||
LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
GRANT EXECUTE ON FUNCTION approximate_working_set_size() TO pg_monitor;
|
||||
|
||||
-- Create a view for convenient access.
|
||||
CREATE VIEW local_cache AS
|
||||
SELECT P.* FROM local_cache_pages() AS P
|
||||
|
||||
@@ -242,6 +242,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
};
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
@@ -337,6 +337,7 @@ impl SharedState {
|
||||
safekeeper_connstr: conf.listen_pg_addr.clone(),
|
||||
backup_lsn: self.sk.inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state.local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +133,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
};
|
||||
counter += 1;
|
||||
yield info;
|
||||
|
||||
@@ -36,9 +36,11 @@ message SafekeeperTimelineInfo {
|
||||
uint64 local_start_lsn = 9;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
message TenantTimelineId {
|
||||
bytes tenant_id = 1;
|
||||
bytes timeline_id = 2;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -525,6 +525,7 @@ mod tests {
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1220,6 +1220,28 @@ class PageserverHttpClient(requests.Session):
|
||||
self.verbose_error(res)
|
||||
return TenantConfig.from_json(res.json())
|
||||
|
||||
def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]):
|
||||
assert "tenant_id" not in config.keys()
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/config",
|
||||
json={**config, "tenant_id": str(tenant_id)},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def patch_tenant_config_client_side(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
inserts: Optional[Dict[str, Any]] = None,
|
||||
removes: Optional[List[str]] = None,
|
||||
):
|
||||
current = self.tenant_config(tenant_id).tenant_specific_overrides
|
||||
if inserts is not None:
|
||||
current.update(inserts)
|
||||
if removes is not None:
|
||||
for key in removes:
|
||||
del current[key]
|
||||
self.set_tenant_config(tenant_id, current)
|
||||
|
||||
def tenant_size(self, tenant_id: TenantId) -> int:
|
||||
return self.tenant_size_and_modelinputs(tenant_id)[0]
|
||||
|
||||
@@ -1536,6 +1558,18 @@ class PageserverHttpClient(requests.Session):
|
||||
for layer in info.historic_layers:
|
||||
self.evict_layer(tenant_id, timeline_id, layer.layer_file_name)
|
||||
|
||||
def disk_usage_eviction_run(self, request: dict[str, Any]):
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/disk_usage_eviction/run",
|
||||
json=request,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def tenant_break(self, tenant_id: TenantId):
|
||||
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
|
||||
self.verbose_error(res)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TenantConfig:
|
||||
|
||||
541
test_runner/regress/test_disk_usage_eviction.py
Normal file
541
test_runner/regress/test_disk_usage_eviction.py
Normal file
@@ -0,0 +1,541 @@
|
||||
import shutil
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, Tuple
|
||||
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
LocalFsStorage,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverHttpClient,
|
||||
PgBin,
|
||||
RemoteStorageKind,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_upload_queue_empty,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("config_level_override", [None, 400])
|
||||
def test_min_resident_size_override_handling(
|
||||
neon_env_builder: NeonEnvBuilder, config_level_override: int
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
def assert_config(tenant_id, expect_override, expect_effective):
|
||||
config = ps_http.tenant_config(tenant_id)
|
||||
assert config.tenant_specific_overrides.get("min_resident_size_override") == expect_override
|
||||
assert config.effective_config.get("min_resident_size_override") == expect_effective
|
||||
|
||||
def assert_overrides(tenant_id, default_tenant_conf_value):
|
||||
ps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 200})
|
||||
assert_config(tenant_id, 200, 200)
|
||||
|
||||
ps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 0})
|
||||
assert_config(tenant_id, 0, 0)
|
||||
|
||||
ps_http.set_tenant_config(tenant_id, {})
|
||||
assert_config(tenant_id, None, default_tenant_conf_value)
|
||||
|
||||
env.pageserver.stop()
|
||||
if config_level_override is not None:
|
||||
env.pageserver.start(
|
||||
overrides=(
|
||||
"--pageserver-config-override=tenant_config={ min_resident_size_override = "
|
||||
+ str(config_level_override)
|
||||
+ " }",
|
||||
)
|
||||
)
|
||||
else:
|
||||
env.pageserver.start()
|
||||
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
assert_overrides(tenant_id, config_level_override)
|
||||
|
||||
# Also ensure that specifying the paramter to create_tenant works, in addition to http-level recconfig.
|
||||
tenant_id, _ = env.neon_cli.create_tenant(conf={"min_resident_size_override": "100"})
|
||||
assert_config(tenant_id, 100, 100)
|
||||
ps_http.set_tenant_config(tenant_id, {})
|
||||
assert_config(tenant_id, None, config_level_override)
|
||||
|
||||
|
||||
@dataclass
|
||||
class EvictionEnv:
|
||||
timelines: list[Tuple[TenantId, TimelineId]]
|
||||
neon_env: NeonEnv
|
||||
pg_bin: PgBin
|
||||
pageserver_http: PageserverHttpClient
|
||||
layer_size: int
|
||||
pgbench_init_lsns: Dict[TenantId, Lsn]
|
||||
|
||||
def timelines_du(self) -> Tuple[int, int, int]:
|
||||
return poor_mans_du(self.neon_env, [(tid, tlid) for tid, tlid in self.timelines])
|
||||
|
||||
def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]:
|
||||
return {
|
||||
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)])[0]
|
||||
for tid, tlid in self.timelines
|
||||
}
|
||||
|
||||
def warm_up_tenant(self, tenant_id: TenantId):
|
||||
"""
|
||||
Start a read-only compute at the LSN after pgbench -i, and run pgbench -S against it.
|
||||
This assumes that the tenant is still at the state after pbench -i.
|
||||
"""
|
||||
lsn = self.pgbench_init_lsns[tenant_id]
|
||||
with self.neon_env.postgres.create_start("main", tenant_id=tenant_id, lsn=lsn) as pg:
|
||||
self.pg_bin.run(["pgbench", "-S", pg.connstr()])
|
||||
|
||||
def pageserver_start_with_disk_usage_eviction(
|
||||
self, period, max_usage_pct, min_avail_bytes, mock_behavior
|
||||
):
|
||||
disk_usage_config = {
|
||||
"period": period,
|
||||
"max_usage_pct": max_usage_pct,
|
||||
"min_avail_bytes": min_avail_bytes,
|
||||
"mock_statvfs": mock_behavior,
|
||||
}
|
||||
|
||||
enc = toml.TomlEncoder()
|
||||
|
||||
self.neon_env.pageserver.start(
|
||||
overrides=(
|
||||
"--pageserver-config-override=disk_usage_based_eviction="
|
||||
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
|
||||
),
|
||||
)
|
||||
|
||||
def statvfs_called():
|
||||
assert self.neon_env.pageserver.log_contains(".*running mocked statvfs.*")
|
||||
|
||||
wait_until(10, 1, statvfs_called)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
|
||||
"""
|
||||
Creates two tenants, one somewhat larger than the other.
|
||||
"""
|
||||
|
||||
log.info(f"setting up eviction_env for test {request.node.name}")
|
||||
|
||||
neon_env_builder.enable_remote_storage(RemoteStorageKind.LOCAL_FS, f"{request.node.name}")
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# allow because we are invoking this manually; we always warn on executing disk based eviction
|
||||
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
|
||||
|
||||
# remove the initial tenant
|
||||
## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865
|
||||
assert env.initial_timeline
|
||||
wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, env.initial_timeline)
|
||||
pageserver_http.tenant_detach(env.initial_tenant)
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant)
|
||||
assert tenant_remote_storage.is_dir()
|
||||
shutil.rmtree(tenant_remote_storage)
|
||||
env.initial_tenant = TenantId("0" * 32)
|
||||
env.initial_timeline = None
|
||||
|
||||
# Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers.
|
||||
# Large count of layers and small layer size is good for testing because it makes evictions predictable.
|
||||
# Predictable in the sense that many layer evictions will be required to reach the eviction target, because
|
||||
# each eviction only makes small progress. That means little overshoot, and thereby stable asserts.
|
||||
pgbench_scales = [4, 6]
|
||||
layer_size = 5 * 1024**2
|
||||
|
||||
pgbench_init_lsns = {}
|
||||
|
||||
timelines = []
|
||||
for scale in pgbench_scales:
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"checkpoint_distance": f"{layer_size}",
|
||||
"image_creation_threshold": "100",
|
||||
"compaction_target_size": f"{layer_size}",
|
||||
}
|
||||
)
|
||||
|
||||
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
|
||||
pg_bin.run(["pgbench", "-i", f"-s{scale}", pg.connstr()])
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
|
||||
timelines.append((tenant_id, timeline_id))
|
||||
|
||||
# stop the safekeepers to avoid on-demand downloads caused by
|
||||
# initial logical size calculation triggered by walreceiver connection status
|
||||
# when we restart the pageserver process in any of the tests
|
||||
env.neon_cli.safekeeper_stop()
|
||||
|
||||
# after stopping the safekeepers, we know that no new WAL will be coming in
|
||||
for tenant_id, timeline_id in timelines:
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload_queue_empty(env.pageserver, tenant_id, timeline_id)
|
||||
tl_info = pageserver_http.timeline_detail(tenant_id, timeline_id)
|
||||
assert tl_info["last_record_lsn"] == tl_info["disk_consistent_lsn"]
|
||||
assert tl_info["disk_consistent_lsn"] == tl_info["remote_consistent_lsn"]
|
||||
pgbench_init_lsns[tenant_id] = Lsn(tl_info["last_record_lsn"])
|
||||
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
log.info(f"{layers}")
|
||||
assert (
|
||||
len(layers.historic_layers) >= 10
|
||||
), "evictions happen at layer granularity, but we often assert at byte-granularity"
|
||||
|
||||
eviction_env = EvictionEnv(
|
||||
timelines=timelines,
|
||||
neon_env=env,
|
||||
pageserver_http=pageserver_http,
|
||||
layer_size=layer_size,
|
||||
pg_bin=pg_bin,
|
||||
pgbench_init_lsns=pgbench_init_lsns,
|
||||
)
|
||||
|
||||
return eviction_env
|
||||
|
||||
|
||||
def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
|
||||
env = eviction_env
|
||||
|
||||
env.neon_env.pageserver.allowed_errors.append(
|
||||
r".* Changing Active tenant to Broken state, reason: broken from test"
|
||||
)
|
||||
broken_tenant_id, broken_timeline_id = env.timelines[0]
|
||||
env.pageserver_http.tenant_break(broken_tenant_id)
|
||||
|
||||
healthy_tenant_id, healthy_timeline_id = env.timelines[1]
|
||||
|
||||
broken_size_pre, _, _ = poor_mans_du(env.neon_env, [(broken_tenant_id, broken_timeline_id)])
|
||||
healthy_size_pre, _, _ = poor_mans_du(env.neon_env, [(healthy_tenant_id, healthy_timeline_id)])
|
||||
|
||||
# try to evict everything, then validate that broken tenant wasn't touched
|
||||
target = broken_size_pre + healthy_size_pre
|
||||
|
||||
response = env.pageserver_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
log.info(f"{response}")
|
||||
|
||||
broken_size_post, _, _ = poor_mans_du(env.neon_env, [(broken_tenant_id, broken_timeline_id)])
|
||||
healthy_size_post, _, _ = poor_mans_du(env.neon_env, [(healthy_tenant_id, healthy_timeline_id)])
|
||||
|
||||
assert broken_size_pre == broken_size_post, "broken tenant should not be touched"
|
||||
assert healthy_size_post < healthy_size_pre
|
||||
assert healthy_size_post == 0
|
||||
env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE)
|
||||
|
||||
|
||||
def test_pageserver_evicts_until_pressure_is_relieved(eviction_env: EvictionEnv):
|
||||
"""
|
||||
Basic test to ensure that we evict enough to relieve pressure.
|
||||
"""
|
||||
env = eviction_env
|
||||
pageserver_http = env.pageserver_http
|
||||
|
||||
(total_on_disk, _, _) = env.timelines_du()
|
||||
|
||||
target = total_on_disk // 2
|
||||
|
||||
response = pageserver_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
log.info(f"{response}")
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
|
||||
actual_change = total_on_disk - later_total_on_disk
|
||||
|
||||
assert 0 <= actual_change, "nothing can load layers during this test"
|
||||
assert actual_change >= target, "must evict more than half"
|
||||
assert (
|
||||
response["Finished"]["assumed"]["projected_after"]["freed_bytes"] >= actual_change
|
||||
), "report accurately evicted bytes"
|
||||
assert response["Finished"]["assumed"]["failed"]["count"] == 0, "zero failures expected"
|
||||
|
||||
|
||||
def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv):
|
||||
"""
|
||||
Override tenant min resident and ensure that it will be respected by eviction.
|
||||
"""
|
||||
env = eviction_env
|
||||
ps_http = env.pageserver_http
|
||||
|
||||
(total_on_disk, _, _) = env.timelines_du()
|
||||
du_by_timeline = env.du_by_timeline()
|
||||
log.info("du_by_timeline: %s", du_by_timeline)
|
||||
|
||||
assert len(du_by_timeline) == 2, "this test assumes two tenants"
|
||||
large_tenant = max(du_by_timeline, key=du_by_timeline.__getitem__)
|
||||
small_tenant = min(du_by_timeline, key=du_by_timeline.__getitem__)
|
||||
assert du_by_timeline[large_tenant] > du_by_timeline[small_tenant]
|
||||
assert (
|
||||
du_by_timeline[large_tenant] - du_by_timeline[small_tenant] > 5 * env.layer_size
|
||||
), "ensure this test will do more than 1 eviction"
|
||||
|
||||
# Give the larger tenant a haircut while preventing the smaller tenant from getting one.
|
||||
# To prevent the smaller from getting a haircut, we set min_resident_size to its current size.
|
||||
# To ensure the larger tenant is getting a haircut, any non-zero `target` will do.
|
||||
min_resident_size = du_by_timeline[small_tenant]
|
||||
target = 1
|
||||
assert (
|
||||
du_by_timeline[large_tenant] > min_resident_size
|
||||
), "ensure the larger tenant will get a haircut"
|
||||
ps_http.patch_tenant_config_client_side(
|
||||
small_tenant[0], {"min_resident_size_override": min_resident_size}
|
||||
)
|
||||
ps_http.patch_tenant_config_client_side(
|
||||
large_tenant[0], {"min_resident_size_override": min_resident_size}
|
||||
)
|
||||
|
||||
# Make the large tenant more-recently used. An incorrect implemention would try to evict
|
||||
# the smaller tenant completely first, before turning to the larger tenant,
|
||||
# since the smaller tenant's layers are least-recently-used.
|
||||
env.warm_up_tenant(large_tenant[0])
|
||||
|
||||
# do one run
|
||||
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
log.info(f"{response}")
|
||||
|
||||
time.sleep(1) # give log time to flush
|
||||
assert not env.neon_env.pageserver.log_contains(
|
||||
GLOBAL_LRU_LOG_LINE,
|
||||
), "this test is pointless if it fell back to global LRU"
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
later_du_by_timeline = env.du_by_timeline()
|
||||
log.info("later_du_by_timeline: %s", later_du_by_timeline)
|
||||
|
||||
actual_change = total_on_disk - later_total_on_disk
|
||||
assert 0 <= actual_change, "nothing can load layers during this test"
|
||||
assert actual_change >= target, "eviction must always evict more than target"
|
||||
assert (
|
||||
response["Finished"]["assumed"]["projected_after"]["freed_bytes"] >= actual_change
|
||||
), "report accurately evicted bytes"
|
||||
assert response["Finished"]["assumed"]["failed"]["count"] == 0, "zero failures expected"
|
||||
|
||||
assert (
|
||||
later_du_by_timeline[small_tenant] == du_by_timeline[small_tenant]
|
||||
), "small tenant sees no haircut"
|
||||
assert (
|
||||
later_du_by_timeline[large_tenant] < du_by_timeline[large_tenant]
|
||||
), "large tenant gets a haircut"
|
||||
assert du_by_timeline[large_tenant] - later_du_by_timeline[large_tenant] >= target
|
||||
|
||||
|
||||
def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv):
|
||||
"""
|
||||
If we can't relieve pressure using tenant_min_resident_size-respecting eviction,
|
||||
we should continue to evict layers following global LRU.
|
||||
"""
|
||||
env = eviction_env
|
||||
ps_http = env.pageserver_http
|
||||
|
||||
(total_on_disk, _, _) = env.timelines_du()
|
||||
target = total_on_disk
|
||||
|
||||
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
log.info(f"{response}")
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
actual_change = total_on_disk - later_total_on_disk
|
||||
assert 0 <= actual_change, "nothing can load layers during this test"
|
||||
assert actual_change >= target, "eviction must always evict more than target"
|
||||
|
||||
time.sleep(1) # give log time to flush
|
||||
assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE)
|
||||
env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE)
|
||||
|
||||
|
||||
def test_partial_evict_tenant(eviction_env: EvictionEnv):
|
||||
"""
|
||||
Warm up a tenant, then build up pressure to cause in evictions in both.
|
||||
We expect
|
||||
* the default min resident size to be respect (largest layer file size)
|
||||
* the warmed-up tenants layers above min resident size to be evicted after the cold tenant's.
|
||||
"""
|
||||
env = eviction_env
|
||||
ps_http = env.pageserver_http
|
||||
|
||||
(total_on_disk, _, _) = env.timelines_du()
|
||||
du_by_timeline = env.du_by_timeline()
|
||||
|
||||
# pick any tenant
|
||||
[our_tenant, other_tenant] = list(du_by_timeline.keys())
|
||||
(tenant_id, timeline_id) = our_tenant
|
||||
|
||||
# make our tenant more recently used than the other one
|
||||
env.warm_up_tenant(tenant_id)
|
||||
|
||||
# Build up enough pressure to require evictions from both tenants,
|
||||
# but not enough to fall into global LRU.
|
||||
# So, set target to all occipied space, except 2*env.layer_size per tenant
|
||||
target = (
|
||||
du_by_timeline[other_tenant] + (du_by_timeline[our_tenant] // 2) - 2 * 2 * env.layer_size
|
||||
)
|
||||
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
log.info(f"{response}")
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
actual_change = total_on_disk - later_total_on_disk
|
||||
assert 0 <= actual_change, "nothing can load layers during this test"
|
||||
assert actual_change >= target, "eviction must always evict more than target"
|
||||
|
||||
later_du_by_timeline = env.du_by_timeline()
|
||||
for tenant, later_tenant_usage in later_du_by_timeline.items():
|
||||
assert (
|
||||
later_tenant_usage < du_by_timeline[tenant]
|
||||
), "all tenants should have lost some layers"
|
||||
|
||||
assert (
|
||||
later_du_by_timeline[our_tenant] > 0.5 * du_by_timeline[our_tenant]
|
||||
), "our warmed up tenant should be at about half capacity, part 1"
|
||||
assert (
|
||||
# We don't know exactly whether the cold tenant needs 2 or just 1 env.layer_size wiggle room.
|
||||
# So, check for up to 3 here.
|
||||
later_du_by_timeline[our_tenant]
|
||||
< 0.5 * du_by_timeline[our_tenant] + 3 * env.layer_size
|
||||
), "our warmed up tenant should be at about half capacity, part 2"
|
||||
assert (
|
||||
later_du_by_timeline[other_tenant] < 2 * env.layer_size
|
||||
), "the other tenant should be evicted to is min_resident_size, i.e., max layer file size"
|
||||
|
||||
|
||||
def poor_mans_du(
|
||||
env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]]
|
||||
) -> Tuple[int, int, int]:
|
||||
"""
|
||||
Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples;
|
||||
this could be done over layers endpoint just as well.
|
||||
"""
|
||||
total_on_disk = 0
|
||||
largest_layer = 0
|
||||
smallest_layer = None
|
||||
for tenant_id, timeline_id in timelines:
|
||||
dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
assert dir.exists(), f"timeline dir does not exist: {dir}"
|
||||
sum = 0
|
||||
for file in dir.iterdir():
|
||||
if "__" not in file.name:
|
||||
continue
|
||||
size = file.stat().st_size
|
||||
sum += size
|
||||
largest_layer = max(largest_layer, size)
|
||||
if smallest_layer:
|
||||
smallest_layer = min(smallest_layer, size)
|
||||
else:
|
||||
smallest_layer = size
|
||||
log.info(f"{tenant_id}/{timeline_id} => {file.name} {size}")
|
||||
|
||||
log.info(f"{tenant_id}/{timeline_id}: sum {sum}")
|
||||
total_on_disk += sum
|
||||
|
||||
assert smallest_layer is not None or total_on_disk == 0 and largest_layer == 0
|
||||
return (total_on_disk, largest_layer, smallest_layer or 0)
|
||||
|
||||
|
||||
def test_statvfs_error_handling(eviction_env: EvictionEnv):
|
||||
"""
|
||||
We should log an error that statvfs fails.
|
||||
"""
|
||||
env = eviction_env
|
||||
env.neon_env.pageserver.stop()
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
period="1s",
|
||||
max_usage_pct=90,
|
||||
min_avail_bytes=0,
|
||||
mock_behavior={
|
||||
"type": "Failure",
|
||||
"mocked_error": "EIO",
|
||||
},
|
||||
)
|
||||
|
||||
assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO")
|
||||
env.neon_env.pageserver.allowed_errors.append(".*statvfs failed.*EIO")
|
||||
|
||||
|
||||
def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
|
||||
"""
|
||||
If statvfs data shows 100% usage, the eviction task will drive it down to
|
||||
the configured max_usage_pct.
|
||||
"""
|
||||
env = eviction_env
|
||||
|
||||
env.neon_env.pageserver.stop()
|
||||
|
||||
# make it seem like we're at 100% utilization by setting total bytes to the used bytes
|
||||
total_size, _, _ = env.timelines_du()
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
period="1s",
|
||||
max_usage_pct=33,
|
||||
min_avail_bytes=0,
|
||||
mock_behavior={
|
||||
"type": "Success",
|
||||
"blocksize": blocksize,
|
||||
"total_blocks": total_blocks,
|
||||
# Only count layer files towards used bytes in the mock_statvfs.
|
||||
# This avoids accounting for metadata files & tenant conf in the tests.
|
||||
"name_filter": ".*__.*",
|
||||
},
|
||||
)
|
||||
|
||||
def relieved_log_message():
|
||||
assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved")
|
||||
|
||||
wait_until(10, 1, relieved_log_message)
|
||||
|
||||
post_eviction_total_size, _, _ = env.timelines_du()
|
||||
|
||||
assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage"
|
||||
|
||||
|
||||
def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
|
||||
"""
|
||||
If statvfs data shows 100% usage, the eviction task will drive it down to
|
||||
at least the configured min_avail_bytes.
|
||||
"""
|
||||
env = eviction_env
|
||||
|
||||
env.neon_env.pageserver.stop()
|
||||
|
||||
# make it seem like we're at 100% utilization by setting total bytes to the used bytes
|
||||
total_size, _, _ = env.timelines_du()
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
|
||||
min_avail_bytes = total_size // 3
|
||||
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
period="1s",
|
||||
max_usage_pct=100,
|
||||
min_avail_bytes=min_avail_bytes,
|
||||
mock_behavior={
|
||||
"type": "Success",
|
||||
"blocksize": blocksize,
|
||||
"total_blocks": total_blocks,
|
||||
# Only count layer files towards used bytes in the mock_statvfs.
|
||||
# This avoids accounting for metadata files & tenant conf in the tests.
|
||||
"name_filter": ".*__.*",
|
||||
},
|
||||
)
|
||||
|
||||
def relieved_log_message():
|
||||
assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved")
|
||||
|
||||
wait_until(10, 1, relieved_log_message)
|
||||
|
||||
post_eviction_total_size, _, _ = env.timelines_du()
|
||||
|
||||
assert (
|
||||
total_size - post_eviction_total_size >= min_avail_bytes
|
||||
), "we requested at least min_avail_bytes worth of free space"
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 4cdba8ec5a...99025b6ee9
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 0ec04712d5...ae0e71b22f
Reference in New Issue
Block a user