Compare commits

..

13 Commits

Author SHA1 Message Date
Yuchen Liang
402cb2c40a add allowed errors for leases in two tests that uses static endpoints
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-20 17:28:29 -04:00
Yuchen Liang
fba409a158 avoid holding locks when a thread is sleeping
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-20 16:01:41 -04:00
Yuchen Liang
8baaccb56c Merge branch 'main' into prepor/lsn_leasing 2024-06-20 07:58:54 -04:00
Christian Schwarz
79401638df remove materialized page cache (#8105)
part of Epic https://github.com/neondatabase/neon/issues/7386

# Motivation

The materialized page cache adds complexity to the code base, which
increases the maintenance burden and risk for subtle and hard to
reproduce bugs such as #8050.

Further, the best hit rate that we currently achieve in production is ca
1% of materialized page cache lookups for
`task_kind=PageRequestHandler`. Other task kinds have hit rates <0.2%.

Last, caching page images in Pageserver rewards under-sized caches in
Computes because reading from Pageserver's materialized page cache over
the network is often sufficiently fast (low hundreds of microseconds).
Such Computes should upscale their local caches to fit their working
set, rather than repeatedly requesting the same page from Pageserver.

Some more discussion and context in internal thread
https://neondb.slack.com/archives/C033RQ5SPDH/p1718714037708459

# Changes

This PR removes the materialized page cache code & metrics.

The infrastructure for different key kinds in `PageCache` is left in
place, even though the "Immutable" key kind is the only remaining one.
This can be further simplified in a future commit.

Some tests started failing because their total runtime was dependent on
high materialized page cache hit rates. This test makes them
fixed-runtime or raises pytest timeouts:
* test_local_file_cache_unlink
* test_physical_replication
* test_pg_regress

# Performance

I focussed on ensuring that this PR will not result in a performance
regression in prod.

* **getpage** requests: our production metrics have shown the
materialized page cache to be irrelevant (low hit rate). Also,
Pageserver is the wrong place to cache page images, it should happen in
compute.
* **ingest** (`task_kind=WalReceiverConnectionHandler`): prod metrics
show 0 percent hit rate, so, removing will not be a regression.
* **get_lsn_by_timestamp**: important API for branch creation, used by
control pane. The clog pages that this code uses are not
materialize-page-cached because they're not 8k. No risk of introducing a
regression here.

We will watch the various nightly benchmarks closely for more results
before shipping to prod.
2024-06-20 11:56:14 +02:00
Alexander Bayandin
c789ec21f6 CI: miscellaneous cleanups (#8073)
## Problem
There are a couple of small CI cleanups that seem too small for dedicated PRs

## Summary of changes
- Create release PR with the title that matches the title in the description
- Tune error message for disallowing `ubuntu-latest` to explicitly
mention what to do
- Remove junit output from pytest, we use allure instead
2024-06-19 19:21:09 +01:00
Alexander Bayandin
558a57b15b CI(test-images): add dockerhub auth (#8115)
## Problem
```
Unable to find image 'neondatabase/neon:9583413584' locally
docker: Error response from daemon: toomanyrequests: You have reached your pull rate limit. You may increase the limit by authenticating and upgrading: https://www.docker.com/increase-rate-limit.
```

## Summary of changes
- add `docker/login-action@v3` for `test-images` job
2024-06-19 16:54:07 +00:00
Andrey Rudenko
06f953ad21 feedback: reread connstrings 2024-06-19 18:16:59 +02:00
John Spray
f0e2bb79b2 tests: use semaphore instead of lock for Endpoint.running (#8112)
## Problem

Ahem, let's try this again.

https://github.com/neondatabase/neon/pull/8110 had a spooky failure in
test_multi_attach where a call to Endpoint.stop() timed out waiting for
a lock, even though we can see an earlier call completing and releasing
the lock. I suspect something weird is going on with the way pytest runs
tests across processes, or use of asyncio perhaps.

Anyway: the simplest fix is to just use a semaphore instead: if we don't
lock we can't deadlock.

## Summary of changes

- Make Endpoint.running a semaphore, where we add a unit to its counter
when starting the process and atomically decrement it when stopping.
2024-06-19 16:07:14 +00:00
Andrey Rudenko
66d0277571 feedback 2024-06-17 15:40:21 +02:00
Andrey Rudenko
3b2159310f fixes to protocol based on https://github.com/neondatabase/neon/pull/8039 2024-06-13 10:52:41 +02:00
Andrey Rudenko
ff55d6f4df feedback: using valid_until (it doesn't work currently) 2024-06-12 11:45:26 +02:00
Andrey Rudenko
3d1bcc9524 linter fix 2024-06-10 14:45:16 +02:00
Andrey Rudenko
ef90a6295e feat(compute_ctl): add periodic lease lsn requests for static computes 2024-06-10 14:45:16 +02:00
31 changed files with 870 additions and 758 deletions

View File

@@ -183,8 +183,7 @@ runs:
# Run the tests.
#
# The junit.xml file allows CI tools to display more fine-grained test information
# in its "Tests" tab in the results page.
# --alluredir saves test results in Allure format (in a specified directory)
# --verbose prints name of each test (helpful when there are
# multiple tests in one file)
# -rA prints summary in the end
@@ -193,7 +192,6 @@ runs:
#
mkdir -p $TEST_OUTPUT/allure/results
"${cov_prefix[@]}" ./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--alluredir=$TEST_OUTPUT/allure/results \
--tb=short \
--verbose \

View File

@@ -36,15 +36,16 @@ jobs:
fail_on_error: true
filter_mode: nofilter
level: error
- run: |
- name: Disallow 'ubuntu-latest' runners
run: |
PAT='^\s*runs-on:.*-latest'
if grep -ERq $PAT .github/workflows
then
if grep -ERq $PAT .github/workflows; then
grep -ERl $PAT .github/workflows |\
while read -r f
do
l=$(grep -nE $PAT .github/workflows/release.yml | awk -F: '{print $1}' | head -1)
echo "::error file=$f,line=$l::Please, do not use ubuntu-latest images to run on, use LTS instead."
echo "::error file=$f,line=$l::Please use 'ubuntu-22.04' instead of 'ubuntu-latest'"
done
exit 1
fi

View File

@@ -1023,6 +1023,18 @@ jobs:
with:
fetch-depth: 0
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker
- name: Set custom docker config directory
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
# `neondatabase/neon` contains multiple binaries, all of them use the same input for the version into the same version formatting library.
# Pick pageserver as currently the only binary with extra "version" features printed in the string to verify.
# Regular pageserver version string looks like
@@ -1057,6 +1069,11 @@ jobs:
docker compose -f ./docker-compose/docker-compose.yml logs || 0
docker compose -f ./docker-compose/docker-compose.yml down
- name: Remove custom docker config directory
if: always()
run: |
rm -rf .docker-custom
promote-images:
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04

View File

@@ -52,13 +52,15 @@ jobs:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
TITLE="Storage & Compute release ${RELEASE_DATE}"
cat << EOF > body.md
## Storage & Compute release ${RELEASE_DATE}
## ${TITLE}
**Please merge this Pull Request using 'Create a merge commit' button**
EOF
gh pr create --title "Release ${RELEASE_DATE}" \
gh pr create --title "${TITLE}" \
--body-file "body.md" \
--head "${RELEASE_BRANCH}" \
--base "release"
@@ -91,13 +93,15 @@ jobs:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
TITLE="Proxy release ${RELEASE_DATE}"
cat << EOF > body.md
## Proxy release ${RELEASE_DATE}
## ${TITLE}
**Please merge this Pull Request using 'Create a merge commit' button**
EOF
gh pr create --title "Proxy release ${RELEASE_DATE}" \
gh pr create --title "${TITLE}" \
--body-file "body.md" \
--head "${RELEASE_BRANCH}" \
--base "release-proxy"

46
Cargo.lock generated
View File

@@ -1056,6 +1056,19 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cgroups-rs"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb3af90c8d48ad5f432d8afb521b5b40c2a2fce46dd60e05912de51c47fba64"
dependencies = [
"libc",
"log",
"nix 0.25.1",
"regex",
"thiserror",
]
[[package]]
name = "chrono"
version = "0.4.38"
@@ -2864,9 +2877,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.155"
version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]]
name = "libloading"
@@ -3126,6 +3139,18 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nix"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
dependencies = [
"autocfg",
"bitflags 1.3.2",
"cfg-if",
"libc",
]
[[package]]
name = "nix"
version = "0.26.4"
@@ -4468,9 +4493,9 @@ dependencies = [
[[package]]
name = "rayon"
version = "1.10.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
dependencies = [
"either",
"rayon-core",
@@ -4478,12 +4503,14 @@ dependencies = [
[[package]]
name = "rayon-core"
version = "1.12.1"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]]
@@ -5901,9 +5928,9 @@ dependencies = [
[[package]]
name = "sysinfo"
version = "0.30.12"
version = "0.29.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae"
checksum = "165d6d8539689e3d3bc8b98ac59541e1f21c7de7c85d60dc80e43ae0ed2113db"
dependencies = [
"cfg-if",
"core-foundation-sys",
@@ -5911,7 +5938,7 @@ dependencies = [
"ntapi",
"once_cell",
"rayon",
"windows 0.52.0",
"winapi",
]
[[package]]
@@ -6784,6 +6811,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"cgroups-rs",
"clap",
"futures",
"inotify 0.10.2",

View File

@@ -144,7 +144,7 @@ rustls = "0.22"
rustls-pemfile = "2"
rustls-split = "0.3"
scopeguard = "1.1"
sysinfo = "0.30.12"
sysinfo = "0.29.2"
sd-notify = "0.4.1"
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }

View File

@@ -45,6 +45,7 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use compute_tools::lsn_lease::launch_lsn_lease_loop_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
@@ -363,6 +364,8 @@ fn wait_spec(
state.start_time = now;
}
launch_lsn_lease_loop_for_static(&compute);
Ok(WaitSpecResult {
compute,
http_port,

View File

@@ -11,6 +11,7 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod extension_server;
pub mod lsn_lease;
pub mod monitor;
pub mod params;
pub mod pg_helpers;

View File

@@ -0,0 +1,113 @@
use anyhow::bail;
use anyhow::Result;
use postgres::{NoTls, SimpleQueryMessage};
use std::{
str::FromStr,
sync::Arc,
thread,
time::{Duration, SystemTime},
};
use compute_api::spec::ComputeMode;
use tracing::{error, info};
use utils::lsn::Lsn;
use crate::compute::{ComputeNode, ComputeState};
pub fn launch_lsn_lease_loop_for_static(compute: &Arc<ComputeNode>) {
let lsn = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
match spec.spec.mode {
ComputeMode::Static(lsn) => lsn,
_ => return,
}
};
let compute = compute.clone();
thread::spawn(move || lsn_lease_loop(compute, lsn));
}
fn postgres_configs_from_state(compute_state: &ComputeState) -> Vec<postgres::Config> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let conn_strings = spec.pageserver_connstr.split(',');
conn_strings
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token.clone());
} else {
info!("Storage auth token not set");
}
config
})
.collect::<Vec<_>>()
}
fn lsn_lease_loop(compute: Arc<ComputeNode>, lsn: Lsn) {
loop {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
let configs = postgres_configs_from_state(&state);
let cmd = format!("lease lsn {} {} {} ", spec.tenant_id, spec.timeline_id, lsn);
drop(state);
match lsn_lease_request(&configs, &cmd) {
Ok(valid_until) => {
let valid_until_duration = Duration::from_millis(valid_until as u64);
let sleep_duration = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.checked_sub(valid_until_duration)
.unwrap_or(Duration::ZERO)
.checked_sub(Duration::from_secs(60))
.unwrap_or(Duration::ZERO);
// Ensure the sleep duration is at least 60 seconds to avoid busy loops
let sleep_duration = std::cmp::max(sleep_duration, Duration::from_secs(60));
info!(
"lsn_lease_request succeeded, sleeping for {} seconds",
sleep_duration.as_secs()
);
thread::sleep(sleep_duration);
}
Err(e) => {
error!("lsn_lease_request failed: {:#}", e);
thread::sleep(Duration::from_secs(10));
}
}
}
}
fn lsn_lease_request(configs: &[postgres::Config], cmd: &str) -> Result<u128> {
info!("lsn_lease_request: {}", cmd);
let valid_until = configs
.iter()
.map(|config| {
let mut client = config.connect(NoTls)?;
let res = client.simple_query(cmd)?;
let msg = match res.first() {
Some(msg) => msg,
None => bail!("empty response"),
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => bail!("error parsing lsn lease response"),
};
let valid_until_str = match row.get("valid_until") {
Some(valid_until) => valid_until,
None => bail!("valid_until not found"),
};
Ok(u128::from_str(valid_until_str)?)
})
.collect::<Result<Vec<u128>>>()?
.into_iter()
.min()
.unwrap();
Ok(valid_until)
}

View File

@@ -5,4 +5,3 @@ TODO:
- shared across tenants
- store pages from layer files
- store pages from "in-memory layer"
- store materialized pages

View File

@@ -134,7 +134,7 @@ depends on that, so if you change it, bad things will happen.
#### page_cache_size
Size of the page cache, to hold materialized page versions. Unit is
Size of the page cache. Unit is
number of 8 kB blocks. The default is 8192, which means 64 MB.
#### max_file_descriptors

View File

@@ -25,3 +25,6 @@ tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.3.3"

View File

@@ -5,6 +5,7 @@
// The monitor was previously started by vm-builder, and for testing purposes,
// we can mimic that setup with this binary.
#[cfg(target_os = "linux")]
#[tokio::main]
async fn main() -> anyhow::Result<()> {
use clap::Parser;
@@ -25,3 +26,8 @@ async fn main() -> anyhow::Result<()> {
let token = CancellationToken::new();
vm_monitor::start(args, token).await
}
#[cfg(not(target_os = "linux"))]
fn main() {
panic!("the monitor requires cgroups, which are only available on linux")
}

View File

@@ -0,0 +1,363 @@
use std::fmt::{self, Debug, Formatter};
use std::time::{Duration, Instant};
use anyhow::{anyhow, Context};
use cgroups_rs::{
hierarchies::{self, is_cgroup2_unified_mode},
memory::MemController,
Subsystem,
};
use tokio::sync::watch;
use tracing::{info, warn};
/// Configuration for a `CgroupWatcher`
#[derive(Debug, Clone)]
pub struct Config {
/// Interval at which we should be fetching memory statistics
memory_poll_interval: Duration,
/// The number of samples used in constructing aggregated memory statistics
memory_history_len: usize,
/// The number of most recent samples that will be periodically logged.
///
/// Each sample is logged exactly once. Increasing this value means that recent samples will be
/// logged less frequently, and vice versa.
///
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
memory_history_log_interval: usize,
/// The max number of iterations to skip before logging the next iteration
memory_history_log_noskip_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
memory_poll_interval: Duration::from_millis(100),
memory_history_len: 5, // use 500ms of history for decision-making
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
}
}
}
/// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
/// OOM killed or throttling.
///
/// The `CgroupWatcher` primarily achieves this by reading from a stream of
/// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
/// cgroup happy.
#[derive(Debug)]
pub struct CgroupWatcher {
pub config: Config,
/// The actual cgroup we are watching and managing.
cgroup: cgroups_rs::Cgroup,
}
impl CgroupWatcher {
/// Create a new `CgroupWatcher`.
#[tracing::instrument(skip_all, fields(%name))]
pub fn new(name: String) -> anyhow::Result<Self> {
// TODO: clarify exactly why we need v2
// Make sure cgroups v2 (aka unified) are supported
if !is_cgroup2_unified_mode() {
anyhow::bail!("cgroups v2 not supported");
}
let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
Ok(Self {
cgroup,
config: Default::default(),
})
}
/// The entrypoint for the `CgroupWatcher`.
#[tracing::instrument(skip_all)]
pub async fn watch(
&self,
updates: watch::Sender<(Instant, MemoryHistory)>,
) -> anyhow::Result<()> {
// this requirement makes the code a bit easier to work with; see the config for more.
assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
let mem_controller = self.memory()?;
// buffer for samples that will be logged. once full, it remains so.
let history_log_len = self.config.memory_history_log_interval;
let max_skip = self.config.memory_history_log_noskip_interval;
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
let mut last_logged_memusage = MemoryStatus::zeroed();
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
let mut can_skip_logs_until = Instant::now() - max_skip;
for t in 0_u64.. {
ticker.tick().await;
let now = Instant::now();
let mem = Self::memory_usage(mem_controller);
let i = t as usize % history_log_len;
history_log_buf[i] = mem;
// We're taking *at most* memory_history_len values; we may be bounded by the total
// number of samples that have come in so far.
let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
// NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
// that we just inserted a value there, so the end of the iterator will *include* the
// value at i, rather than stopping just short of it.
let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
let summary = MemoryHistory {
avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
/ samples_count as u64,
samples_count,
samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
};
// Log the current history if it's time to do so. Because `history_log_buf` has length
// equal to the logging interval, we can just log the entire buffer every time we set
// the last entry, which also means that for this log line, we can ignore that it's a
// ring buffer (because all the entries are in order of increasing time).
//
// We skip logging the data if data hasn't meaningfully changed in a while, unless
// we've already ignored previous iterations for the last max_skip period.
if i == history_log_len - 1
&& (now > can_skip_logs_until
|| !history_log_buf
.iter()
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
{
info!(
history = ?MemoryStatus::debug_slice(&history_log_buf),
summary = ?summary,
"Recent cgroup memory statistics history"
);
can_skip_logs_until = now + max_skip;
last_logged_memusage = *history_log_buf.last().unwrap();
}
updates
.send((now, summary))
.context("failed to send MemoryHistory")?;
}
unreachable!()
}
/// Get a handle on the memory subsystem.
fn memory(&self) -> anyhow::Result<&MemController> {
self.cgroup
.subsystems()
.iter()
.find_map(|sub| match sub {
Subsystem::Mem(c) => Some(c),
_ => None,
})
.ok_or_else(|| anyhow!("could not find memory subsystem"))
}
/// Given a handle on the memory subsystem, returns the current memory information
fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
let stat = mem_controller.memory_stat().stat;
MemoryStatus {
non_reclaimable: stat.active_anon + stat.inactive_anon,
}
}
}
// Helper function for `CgroupWatcher::watch`
fn ring_buf_recent_values_iter<T>(
buf: &[T],
last_value_idx: usize,
count: usize,
) -> impl '_ + Iterator<Item = &T> {
// Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
// easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
assert!(count <= buf.len());
buf.iter()
// 'cycle' because the values could wrap around
.cycle()
// with 'cycle', this skip is more like 'offset', and functionally this is
// offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
// careful to avoid underflow, so we pre-add buf.len().
// The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
.skip((buf.len() + last_value_idx + 1 - count) % buf.len())
.take(count)
}
/// Summary of recent memory usage
#[derive(Debug, Copy, Clone)]
pub struct MemoryHistory {
/// Rolling average of non-reclaimable memory usage samples over the last `history_period`
pub avg_non_reclaimable: u64,
/// The number of samples used to construct this summary
pub samples_count: usize,
/// Total timespan between the first and last sample used for this summary
pub samples_span: Duration,
}
#[derive(Debug, Copy, Clone)]
pub struct MemoryStatus {
non_reclaimable: u64,
}
impl MemoryStatus {
fn zeroed() -> Self {
MemoryStatus { non_reclaimable: 0 }
}
fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
struct DS<'a>(&'a [MemoryStatus]);
impl<'a> Debug for DS<'a> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("[MemoryStatus]")
.field(
"non_reclaimable[..]",
&Fields(self.0, |stat: &MemoryStatus| {
BytesToGB(stat.non_reclaimable)
}),
)
.finish()
}
}
struct Fields<'a, F>(&'a [MemoryStatus], F);
impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_list().entries(self.0.iter().map(&self.1)).finish()
}
}
struct BytesToGB(u64);
impl Debug for BytesToGB {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.write_fmt(format_args!(
"{:.3}Gi",
self.0 as f64 / (1_u64 << 30) as f64
))
}
}
DS(slice)
}
/// Check if the other memory status is a close or similar result.
/// Returns true if the larger value is not larger than the smaller value
/// by 1/8 of the smaller value, and within 128MiB.
/// See tests::check_similarity_behaviour for examples of behaviour
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
let margin;
let diff;
if self.non_reclaimable >= other.non_reclaimable {
margin = other.non_reclaimable / 8;
diff = self.non_reclaimable - other.non_reclaimable;
} else {
margin = self.non_reclaimable / 8;
diff = other.non_reclaimable - self.non_reclaimable;
}
diff < margin && diff < 128 * 1024 * 1024
}
}
#[cfg(test)]
mod tests {
#[test]
fn ring_buf_iter() {
let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let values = |offset, count| {
super::ring_buf_recent_values_iter(&buf, offset, count)
.copied()
.collect::<Vec<i32>>()
};
// Boundary conditions: start, end, and entire thing:
assert_eq!(values(0, 1), [0]);
assert_eq!(values(3, 4), [0, 1, 2, 3]);
assert_eq!(values(9, 4), [6, 7, 8, 9]);
assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
// "normal" operation: no wraparound
assert_eq!(values(7, 4), [4, 5, 6, 7]);
// wraparound:
assert_eq!(values(0, 4), [7, 8, 9, 0]);
assert_eq!(values(1, 4), [8, 9, 0, 1]);
assert_eq!(values(2, 4), [9, 0, 1, 2]);
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
}
#[test]
fn check_similarity_behaviour() {
// This all accesses private methods, so we can't actually run this
// as doctests, because doctests run as an external crate.
let mut small = super::MemoryStatus {
non_reclaimable: 1024,
};
let mut large = super::MemoryStatus {
non_reclaimable: 1024 * 1024 * 1024 * 1024,
};
// objects are self-similar, no matter the size
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
// inequality is symmetric
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
small.non_reclaimable = 64;
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
// objects are self-similar, no matter the size
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
// values are similar if the larger value is larger by less than
// 12.5%, i.e. 1/8 of the smaller value.
// In the example above, large is exactly 12.5% larger, so this doesn't
// match.
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
large.non_reclaimable -= 1;
assert!(large.status_is_close_or_similar(&large));
assert!(small.status_is_close_or_similar(&large));
assert!(large.status_is_close_or_similar(&small));
// The 1/8 rule only applies up to 128MiB of difference
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
large.non_reclaimable = small.non_reclaimable / 8 * 9;
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
// the large value is put just above the threshold
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
assert!(large.status_is_close_or_similar(&large));
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
// now below
large.non_reclaimable -= 1;
assert!(large.status_is_close_or_similar(&large));
assert!(small.status_is_close_or_similar(&large));
assert!(large.status_is_close_or_similar(&small));
}
}

View File

@@ -2,7 +2,9 @@
//!
//! Contains types that manage the interaction (not data interchange, see `protocol`)
//! between agent and monitor, allowing us to to process and send messages in a
//! straightforward way.
//! straightforward way. The dispatcher also manages that signals that come from
//! the cgroup (requesting upscale), and the signals that go to the cgroup
//! (notifying it of upscale).
use anyhow::{bail, Context};
use axum::extract::ws::{Message, WebSocket};
@@ -19,7 +21,11 @@ use crate::protocol::{
/// The central handler for all communications in the monitor.
///
/// The dispatcher manages the connection to the agent, sending and receiving messages.
/// The dispatcher has two purposes:
/// 1. Manage the connection to the agent, sending and receiving messages.
/// 2. Communicate with the cgroup manager, notifying it when upscale is received,
/// and sending a message to the agent when the cgroup manager requests
/// upscale.
#[derive(Debug)]
pub struct Dispatcher {
/// We read agent messages of of `source`

View File

@@ -11,6 +11,7 @@ use axum::{routing::get, Router, Server};
use clap::Parser;
use futures::Future;
use std::{fmt::Debug, time::Duration};
use sysinfo::{RefreshKind, System, SystemExt};
use tokio::{sync::broadcast, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
@@ -21,18 +22,18 @@ use runner::Runner;
pub mod dispatcher;
pub mod protocol;
pub mod cgroup;
pub mod filecache;
pub mod runner;
pub mod sliding_window;
/// The vm-monitor is an autoscaling component started by compute_ctl.
///
/// It carries out autoscaling decisions (upscaling/downscaling) and responds to
/// memory pressure by making requests to the autoscaler-agent.
#[derive(Debug, Parser)]
pub struct Args {
/// Unused but accepted for backwards compatibility.
/// The name of the cgroup we should monitor for memory.high events. This
/// is the cgroup that postgres should be running in.
#[arg(short, long)]
pub cgroup: Option<String>,
@@ -62,6 +63,10 @@ pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
(bytes as f32) / (MiB as f32)
}
pub fn get_total_system_memory() -> u64 {
System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
}
/// Global app state for the Axum server
#[derive(Debug, Clone)]
pub struct ServerState {
@@ -82,7 +87,8 @@ pub struct ServerState {
///
/// This is mainly meant to be called with futures that will be pending for a very
/// long time, or are not mean to return. If it is not desirable for the future to
/// ever resolve, the error can be logged with `f`.
/// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
/// be logged with `f`.
pub fn spawn_with_cancel<T, F>(
token: CancellationToken,
f: F,

View File

@@ -59,15 +59,16 @@ pub enum OutboundMsgKind {
/// properly deserialize it.
InvalidMessage { error: String },
/// Indicates that we experienced an internal error while processing a message.
/// For example, if enlarging the file cache fails while trying to handle an upscale,
/// For example, if a cgroup operation fails while trying to handle an upscale,
/// we return `InternalError`.
InternalError { error: String },
/// Returned to the agent once we have finished handling an upscale. If the
/// handling was unsuccessful, an `InternalError` will get returned instead.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleConfirmation {},
/// Indicates to the monitor that we are requesting the VM to be scaled to this size.
ScaleRequest { target: Resources },
/// Indicates to the monitor that we are urgently requesting resources.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleRequest {},
/// Returned to the agent once we have finished attempting to downscale. If
/// an error occured trying to do so, an `InternalError` will get returned instead.
/// However, if we are simply unsuccessful (for example, do to needing the resources),
@@ -125,7 +126,7 @@ pub enum InboundMsgKind {
}
/// Represents the resources granted to a VM.
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
// Renamed because the agent has multiple resources types:
// `Resources` (milliCPU/memory slots)
// `Allocation` (vCPU/bytes) <- what we correspond to

View File

@@ -10,23 +10,23 @@ use std::time::{Duration, Instant};
use anyhow::{bail, Context};
use axum::extract::ws::{Message, WebSocket};
use futures::StreamExt;
use sysinfo::System;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, watch};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use crate::cgroup::{self, CgroupWatcher};
use crate::dispatcher::Dispatcher;
use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
use crate::sliding_window::SlidingMax;
use crate::{bytes_to_mebibytes, Args, MiB};
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
/// Central struct that tracks the desired scaling target, and interacts with the agent
/// and dispatcher to handle signals from the agent.
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
/// signals from the agent.
#[derive(Debug)]
pub struct Runner {
config: Config,
filecache: Option<FileCacheState>,
cgroup: Option<CgroupState>,
dispatcher: Dispatcher,
/// We "mint" new message ids by incrementing this counter and taking the value.
@@ -35,12 +35,7 @@ pub struct Runner {
/// by us vs the autoscaler-agent.
counter: usize,
last_scale_request: Option<(Resources, Instant)>,
cpu_window: SlidingMax<f64, Instant>,
mem_window: SlidingMax<u64, Instant>,
system: System,
last_upscale_request_at: Option<Instant>,
/// A signal to kill the main thread produced by `self.run()`. This is triggered
/// when the server receives a new connection. When the thread receives the
@@ -48,6 +43,14 @@ pub struct Runner {
kill: broadcast::Receiver<()>,
}
#[derive(Debug)]
struct CgroupState {
watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
/// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
/// requests.
threshold: u64,
}
/// Configuration for a `Runner`
#[derive(Debug)]
pub struct Config {
@@ -66,45 +69,55 @@ pub struct Config {
/// should be removed once we have a better solution there.
sys_buffer_bytes: u64,
/// Interval at which we poll memory and CPU statistics for scaling decisions.
poll_interval: Duration,
/// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
/// other words, providing a ceiling for the highest value of the threshold by enforcing that
/// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
/// threshold.
///
/// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
/// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
/// memory.
///
/// The default value of `0.15` means that we *guarantee* sending upscale requests if the
/// cgroup is using more than 85% of total memory (even if we're *not* separately reserving
/// memory for the file cache).
cgroup_min_overhead_fraction: f64,
/// The resources requested from the agent are calculated based on the Max of memory
/// usage and load average over a sliding window of the last X seconds. This controls
/// the length of the window to consider.
sliding_window_length: Duration,
/// Desired fraction of current CPU that the load average should be. For example, with a value
/// of 0.7, we'd want load average to sit at 0.7 × CPU, scaling CPU to make this happen.
load_average_fraction_target: f64,
/// Desired fraction of current memory that we would like to be using. For example, with a value
/// of 0.7, on a 4GB VM we'd like to be using 2.8GB of memory.
memory_usage_fraction_target: f64,
/// When requesting scaling to a certain # of CPUs, the request is rounded up to the
/// nearest multiple of 'cpu_quantum'. For example, if the desired # of CPUs based on the
/// usage is 3.1, and cpu_quantum is 0.25, we'd request 3.25 CPUs.
cpu_quantum: f64,
/// Like 'cpu_quantum', but for memory. In bytes.
mem_quantum: u64,
cgroup_downscale_threshold_buffer_bytes: u64,
}
impl Default for Config {
fn default() -> Self {
Self {
sys_buffer_bytes: 100 * MiB,
poll_interval: Duration::from_millis(100),
sliding_window_length: Duration::from_secs(60),
cpu_quantum: 0.25,
mem_quantum: 512 * 1024 * 1024,
load_average_fraction_target: 0.9,
memory_usage_fraction_target: 0.75,
cgroup_min_overhead_fraction: 0.15,
cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
}
}
}
impl Config {
fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 {
// If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup,
// and thus be non-reclaimable, so we should allow for additional memory usage.
//
// If the file cache sits on disk, our desired stable system state is for it to be fully
// page cached (its contents should only be paged to/from disk in situations where we can't
// upscale fast enough). Page-cached memory is reclaimable, so we need to lower the
// threshold for non-reclaimable memory so we scale up *before* the kernel starts paging
// out the file cache.
let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size);
// Even if we're not separately making room for the file cache (if it's in tmpfs), we still
// want our threshold to be met gracefully instead of letting postgres get OOM-killed.
// So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
// remaining above the threshold.
let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64;
memory_remaining_for_cgroup.min(max_threshold)
}
}
impl Runner {
/// Create a new monitor.
#[tracing::instrument(skip_all, fields(?config, ?args))]
@@ -124,25 +137,23 @@ impl Runner {
.await
.context("error creating new dispatcher")?;
let now = Instant::now();
let mut state = Runner {
config,
filecache: None,
cgroup: None,
dispatcher,
counter: 1, // NB: must be odd, see the comment about the field for more.
last_scale_request: None,
cpu_window: SlidingMax::new(0.0, now),
mem_window: SlidingMax::new(0, now),
last_upscale_request_at: None,
kill,
system: System::new(),
};
state.system.refresh_specifics(
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
);
let mem = get_total_system_memory();
let mem = state.system.total_memory();
let mut file_cache_disk_size = 0;
// We need to process file cache initialization before cgroup initialization, so that the memory
// allocated to the file cache is appropriately taken into account when we decide the cgroup's
// memory limits.
if let Some(connstr) = &args.pgconnstr {
info!("initializing file cache");
let config = FileCacheConfig::default();
@@ -173,19 +184,51 @@ impl Runner {
info!("file cache size actually got set to {actual_size}")
}
file_cache_disk_size = actual_size;
state.filecache = Some(file_cache);
}
if let Some(name) = &args.cgroup {
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
// now, and then set limits later.
info!("initializing cgroup");
let cgroup =
CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
let init_value = cgroup::MemoryHistory {
avg_non_reclaimable: 0,
samples_count: 0,
samples_span: Duration::ZERO,
};
let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
cgroup.watch(hist_tx).await
});
let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size);
info!(threshold, "set initial cgroup threshold",);
state.cgroup = Some(CgroupState {
watcher: hist_rx,
threshold,
});
}
Ok(state)
}
/// Attempt to downscale filecache
/// Attempt to downscale filecache + cgroup
#[tracing::instrument(skip_all, fields(?target))]
pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
// Nothing to adjust
if self.filecache.is_none() {
info!("no action needed for downscale (no file cache enabled)");
return Ok((true, "monitor is not managing file cache".to_string()));
if self.cgroup.is_none() && self.filecache.is_none() {
info!("no action needed for downscale (no cgroup or file cache enabled)");
return Ok((
true,
"monitor is not managing cgroup or file cache".to_string(),
));
}
let requested_mem = target.mem;
@@ -195,14 +238,57 @@ impl Runner {
.as_ref()
.map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
.unwrap_or(0);
if let Some(cgroup) = &self.cgroup {
let (last_time, last_history) = *cgroup.watcher.borrow();
// The downscaling has been approved. Downscale the file cache.
// NB: The ordering of these conditions is intentional. During startup, we should deny
// downscaling until we have enough information to determine that it's safe to do so
// (i.e. enough samples have come in). But if it's been a while and we *still* haven't
// received any information, we should *fail* instead of just denying downscaling.
//
// `last_time` is set to `Instant::now()` on startup, so checking `last_time.elapsed()`
// serves double-duty: it trips if we haven't received *any* metrics for long enough,
// OR if we haven't received metrics *recently enough*.
//
// TODO: make the duration here configurable.
if last_time.elapsed() > Duration::from_secs(5) {
bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
} else if last_history.samples_count <= 1 {
let status = "haven't received enough cgroup memory stats yet";
info!(status, "discontinuing downscale");
return Ok((false, status.to_owned()));
}
let new_threshold = self
.config
.cgroup_threshold(usable_system_memory, expected_file_cache_size);
let current = last_history.avg_non_reclaimable;
if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
let status = format!(
"{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
"calculated memory threshold too low",
bytes_to_mebibytes(new_threshold),
bytes_to_mebibytes(current),
bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
);
info!(status, "discontinuing downscale");
return Ok((false, status));
}
}
// The downscaling has been approved. Downscale the file cache, then the cgroup.
let mut status = vec![];
let mut file_cache_disk_size = 0;
if let Some(file_cache) = &mut self.filecache {
let actual_usage = file_cache
.set_file_cache_size(expected_file_cache_size)
.await
.context("failed to set file cache size")?;
file_cache_disk_size = actual_usage;
let message = format!(
"set file cache size to {} MiB",
bytes_to_mebibytes(actual_usage),
@@ -211,6 +297,22 @@ impl Runner {
status.push(message);
}
if let Some(cgroup) = &mut self.cgroup {
let new_threshold = self
.config
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
let message = format!(
"set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
bytes_to_mebibytes(cgroup.threshold),
bytes_to_mebibytes(new_threshold),
bytes_to_mebibytes(usable_system_memory)
);
cgroup.threshold = new_threshold;
info!("downscale: {message}");
status.push(message);
}
// TODO: make this status thing less jank
let status = status.join("; ");
Ok((true, status))
@@ -219,14 +321,15 @@ impl Runner {
/// Handle new resources
#[tracing::instrument(skip_all, fields(?resources))]
pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
if self.filecache.is_none() {
info!("no action needed for upscale (file cache is disabled)");
if self.filecache.is_none() && self.cgroup.is_none() {
info!("no action needed for upscale (no cgroup or file cache enabled)");
return Ok(());
}
let new_mem = resources.mem;
let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
let mut file_cache_disk_size = 0;
if let Some(file_cache) = &mut self.filecache {
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
info!(
@@ -239,6 +342,7 @@ impl Runner {
.set_file_cache_size(expected_usage)
.await
.context("failed to set file cache size")?;
file_cache_disk_size = actual_usage;
if actual_usage != expected_usage {
warn!(
@@ -249,6 +353,20 @@ impl Runner {
}
}
if let Some(cgroup) = &mut self.cgroup {
let new_threshold = self
.config
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
info!(
"set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
bytes_to_mebibytes(cgroup.threshold),
bytes_to_mebibytes(new_threshold),
bytes_to_mebibytes(usable_system_memory)
);
cgroup.threshold = new_threshold;
}
Ok(())
}
@@ -295,57 +413,10 @@ impl Runner {
}
}
/// Calculate the desired size of the VM, based on the CPU and memory usage right now.
pub fn calculate_raw_target(&mut self) -> Resources {
self.system.refresh_specifics(
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
);
// For CPU:
//
// We use the 1 minute load average to measure "current" CPU usage. Target # of
// CPUs is at the point where:
//
// (CPUs) * (LoadAverageFractionTarget) == (load average).
let load_avg_1min = System::load_average().one;
let goal_cpus = load_avg_1min / self.config.load_average_fraction_target;
// For Memory:
//
// Target point is where (Mem) * (MemoryUsageFractionTarget) == (Mem Usage)
let used_memory = self.system.used_memory();
let goal_memory_bytes: u64 =
(self.system.used_memory() as f64 / self.config.memory_usage_fraction_target) as u64;
debug!("load avg: {load_avg_1min} used memory: {used_memory}");
Resources {
cpu: goal_cpus,
mem: goal_memory_bytes,
}
}
/// To avoid overly fine-grained requests to the agent, round up the request to a
/// multiple of the CPU and memory size of one a Compute Unit.
///
/// We still track CPU and memory separately though. The autoscaler agent will combine
/// the CPU and memory requests to a single "# of Compute Units" measure.
fn quantize_resources(&self, res: Resources) -> Resources {
Resources {
cpu: (res.cpu / self.config.cpu_quantum).ceil() * self.config.cpu_quantum,
mem: ((res.mem as f64 / self.config.mem_quantum as f64).ceil()
* self.config.mem_quantum as f64) as u64,
}
}
// TODO: don't propagate errors, probably just warn!?
#[tracing::instrument(skip_all)]
pub async fn run(&mut self) -> anyhow::Result<()> {
info!("starting dispatcher");
let mut ticker = tokio::time::interval(self.config.poll_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
loop {
tokio::select! {
signal = self.kill.recv() => {
@@ -355,39 +426,25 @@ impl Runner {
}
}
// Time to re-evaluate the scaling target
_ = ticker.tick() => {
let now = Instant::now();
// New memory stats from the cgroup, *may* need to request upscaling, if we've
// exceeded the threshold
result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
result.context("failed to receive from cgroup memory stats watcher")?;
// Calculate the desired resources based on current usage
let target_now = self.calculate_raw_target();
let cgroup = self.cgroup.as_ref().unwrap();
// Round it up to the nearest CU sizes, to avoid overly fine-grained
// requests.
let quantized_target_now = self.quantize_resources(target_now);
let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
// Smoothen using sliding windows.
self.cpu_window.add_sample(quantized_target_now.cpu, now);
self.cpu_window.trim(now - self.config.sliding_window_length);
self.mem_window.add_sample(quantized_target_now.mem, now);
self.mem_window.trim(now - self.config.sliding_window_length);
// If we haven't exceeded the threshold, then we're all ok
if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
continue;
}
let sliding_target = Resources {
cpu: *self.cpu_window.get_max(),
mem: *self.mem_window.get_max(),
};
// If no change, we're all ok.
//
// XXX: If the agent doesn't perform the scaling, should we retry after a while though?
if let Some((last_request_target, last_request_at)) = self.last_scale_request {
if last_request_target == sliding_target {
continue;
}
// If it's been less than 1 second since the last time we requested
// scaling, don't send a request to avoid spamming the agent.
let elapsed = now.duration_since(last_request_at);
// Otherwise, we generally want upscaling. But, if it's been less than 1 second
// since the last time we requested upscaling, ignore the event, to avoid
// spamming the agent.
if let Some(t) = self.last_upscale_request_at {
let elapsed = t.elapsed();
if elapsed < Duration::from_secs(1) {
// *Ideally* we'd like to log here that we're ignoring the fact the
// memory stats are too high, but in practice this can result in
@@ -396,28 +453,20 @@ impl Runner {
// See https://github.com/neondatabase/neon/issues/5865 for more.
continue;
}
info!(
old_target_cpu = last_request_target.cpu,
old_target_mem = last_request_target.mem,
target_cpu = sliding_target.cpu,
target_mem = sliding_target.mem,
"scaling target changed, requesting scaling",
);
} else {
info!(
target_cpu = sliding_target.cpu,
target_mem = sliding_target.mem,
"no previous scaling request, requesting initial scale",
);
}
self.last_scale_request = Some((sliding_target, now));
self.last_upscale_request_at = Some(Instant::now());
info!(
avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
threshold = bytes_to_mebibytes(cgroup.threshold),
"cgroup memory stats are high enough to upscale, requesting upscale",
);
self.counter += 2; // Increment, preserving parity (i.e. keep the
// counter odd). See the field comment for more.
self.dispatcher
.send(OutboundMsg::new(OutboundMsgKind::ScaleRequest {target: sliding_target}, self.counter))
.send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
.await
.context("failed to send message")?;
},

View File

@@ -1,72 +0,0 @@
use std::collections::VecDeque;
/// Maintain a sliding window for calculating Max over a period of time.
///
/// The window maintains a queue of samples. Each sample consists of a
/// "value" and the timestamp that it was measured at.
///
/// The queue is ordered by time, newest samples are at the front.
/// When a new sample is added, we delete any older samples in the
/// queue with a lower value, because they cannot affect the max
/// anymore. This means that the queue is always also ordered by value,
/// with the greatest value at the back:
///
/// front back
///
/// #
/// # #
/// # # #
/// # # #
///
///
/// V: Value
/// T: Time unit
#[derive(Debug)]
pub struct SlidingMax<V, T> {
samples: VecDeque<(V, T)>,
}
impl<V: std::cmp::PartialOrd, T: std::cmp::PartialOrd> SlidingMax<V, T> {
pub fn new(initial_val: V, initial_time: T) -> SlidingMax<V, T> {
SlidingMax {
samples: VecDeque::from([(initial_val, initial_time)]),
}
}
/// Add a new sample to the window.
///
/// We assume that the time is >= the time of any existing sample
/// in the queue, although we don't check it, and the code still
/// works without e.g. panicking if you violate that. It just
/// might not produce the correct result, until the disordered
/// samples have fallen off the window.
pub fn add_sample(&mut self, sample: V, time: T) {
while let Some((v, _t)) = self.samples.front() {
if sample < *v {
break;
} else {
self.samples.pop_front();
continue;
}
}
self.samples.push_front((sample, time))
}
/// Remove samples older than 'threshold' from the window
pub fn trim(&mut self, threshold: T) {
while self.samples.len() >= 2 {
let (_v, t) = self.samples.back().unwrap();
if *t < threshold {
self.samples.pop_back();
} else {
break;
}
}
}
/// Get the current max over the window
pub fn get_max(&self) -> &V {
&self.samples.back().unwrap().0
}
}

View File

@@ -145,14 +145,6 @@ impl ReconstructTimeMetrics {
}
}
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_direct_total",
"Number of cache hits from materialized page cache without redo",
)
.expect("failed to define a metric")
});
pub(crate) struct ReconstructDataTimeMetrics {
singular: Histogram,
vectored: Histogram,
@@ -182,14 +174,6 @@ pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> =
}
});
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
)
.expect("failed to define a metric")
});
pub(crate) struct GetVectoredLatency {
map: EnumMap<TaskKind, Option<Histogram>>,
}
@@ -298,12 +282,8 @@ pub(crate) static SCAN_LATENCY: Lazy<ScanLatency> = Lazy::new(|| {
});
pub(crate) struct PageCacheMetricsForTaskKind {
pub read_accesses_materialized_page: IntCounter,
pub read_accesses_immutable: IntCounter,
pub read_hits_immutable: IntCounter,
pub read_hits_materialized_page_exact: IntCounter,
pub read_hits_materialized_page_older_lsn: IntCounter,
}
pub(crate) struct PageCacheMetrics {
@@ -336,16 +316,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
let content_kind: &'static str = content_kind.into();
PageCacheMetricsForTaskKind {
read_accesses_materialized_page: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
])
.unwrap()
},
read_accesses_immutable: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
@@ -357,28 +327,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
.unwrap()
},
read_hits_materialized_page_exact: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
"exact",
])
.unwrap()
},
read_hits_materialized_page_older_lsn: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
"older_lsn",
])
.unwrap()
},
}
}))
})),
@@ -394,7 +342,6 @@ pub(crate) struct PageCacheSizeMetrics {
pub max_bytes: UIntGauge,
pub current_bytes_immutable: UIntGauge,
pub current_bytes_materialized_page: UIntGauge,
}
static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy<UIntGaugeVec> = Lazy::new(|| {
@@ -420,11 +367,6 @@ pub(crate) static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> =
.get_metric_with_label_values(&["immutable"])
.unwrap()
},
current_bytes_materialized_page: {
PAGE_CACHE_SIZE_CURRENT_BYTES
.get_metric_with_label_values(&["materialized_page"])
.unwrap()
},
});
pub(crate) mod page_cache_eviction_metrics {
@@ -2918,13 +2860,11 @@ pub fn preinitialize_metrics() {
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
// order:
// - global metrics reside in a Lazy<PageserverMetrics>
// - access via crate::metrics::PS_METRICS.materialized_page_cache_hit.inc()
// - access via crate::metrics::PS_METRICS.some_metric.inc()
// - could move the statics into TimelineMetrics::new()?
// counters
[
&MATERIALIZED_PAGE_CACHE_HIT,
&MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
&UNEXPECTED_ONDEMAND_DOWNLOADS,
&WALRECEIVER_STARTED_CONNECTIONS,
&WALRECEIVER_BROKER_UPDATES,

View File

@@ -17,7 +17,6 @@
//!
//! Two types of pages are supported:
//!
//! * **Materialized pages**, filled & used by page reconstruction
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
//!
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
@@ -28,9 +27,6 @@
//! Page cache maps from a cache key to a buffer slot.
//! The cache key uniquely identifies the piece of data that is being cached.
//!
//! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
//!
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
@@ -82,13 +78,10 @@ use std::{
use anyhow::Context;
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use utils::{id::TimelineId, lsn::Lsn};
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
repository::Key,
};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -139,33 +132,7 @@ pub fn next_file_id() -> FileId {
#[derive(Debug, PartialEq, Eq, Clone)]
#[allow(clippy::enum_variant_names)]
enum CacheKey {
MaterializedPage {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
ImmutableFilePage {
file_id: FileId,
blkno: u32,
},
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct MaterializedPageHashKey {
/// Why is this TenantShardId rather than TenantId?
///
/// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this
/// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this
/// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
/// special-cased in some other way.
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: Key,
}
#[derive(Clone)]
struct Version {
lsn: Lsn,
slot_idx: usize,
ImmutableFilePage { file_id: FileId, blkno: u32 },
}
struct Slot {
@@ -236,17 +203,6 @@ impl SlotInner {
}
pub struct PageCache {
/// This contains the mapping from the cache key to buffer slot that currently
/// contains the page, if any.
///
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
/// this HashMap can be replaced with a more concurrent version, there are
/// plenty of such crates around.
///
/// If you add support for caching different kinds of objects, each object kind
/// can have a separate mapping map, next to this field.
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
@@ -371,175 +327,14 @@ pub enum ReadBufResult<'a> {
}
impl PageCache {
//
// Section 1.1: Public interface functions for looking up and memorizing materialized page
// versions in the page cache
//
/// Look up a materialized page version.
///
/// The 'lsn' is an upper bound, this will return the latest version of
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
/// returned page.
pub async fn lookup_materialized_page(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
return None;
};
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_accesses_materialized_page
.inc();
let mut cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_shard_id,
timeline_id,
key: *key,
},
lsn,
};
if let Some(guard) = self
.try_lock_for_read(&mut cache_key, &mut Some(permit))
.await
{
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
} = cache_key
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_hits_materialized_page_older_lsn
.inc();
}
Some((available_lsn, guard))
} else {
panic!("unexpected key type in slot");
}
} else {
None
}
}
///
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: Key,
lsn: Lsn,
img: &[u8],
) -> anyhow::Result<()> {
let cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_shard_id,
timeline_id,
key,
},
lsn,
};
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(&cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
debug_assert_eq!(inner.buf.len(), img.len());
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(inner.buf == img);
return Ok(());
}
}
debug_assert!(permit.is_some());
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
// Create a write guard for the slot so we go through the expected motions.
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
let mut write_guard = PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
};
write_guard.copy_from_slice(img);
let _ = write_guard.mark_valid();
return Ok(());
}
}
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key, ctx).await
self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
.await
}
//
@@ -573,19 +368,11 @@ impl PageCache {
/// Look up a page in the cache.
///
/// If the search criteria is not exact, *cache_key is updated with the key
/// for exact key of the returned page. (For materialized pages, that means
/// that the LSN in 'cache_key' is updated with the LSN of the returned page
/// version.)
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(
&self,
cache_key: &mut CacheKey,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we released the mapping
@@ -598,9 +385,6 @@ impl PageCache {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
} else {
// search_mapping might have modified the search key; restore it.
*cache_key = cache_key_orig;
}
}
None
@@ -637,15 +421,12 @@ impl PageCache {
///
async fn lock_for_read(
&self,
cache_key: &mut CacheKey,
cache_key: &CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE
.for_ctx(ctx)
@@ -717,52 +498,15 @@ impl PageCache {
/// Search for a page in the cache using the given search key.
///
/// Returns the slot index, if any. If the search criteria is not exact,
/// *cache_key is updated with the actual key of the found page.
/// Returns the slot index, if any.
///
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
/// get recycled for an unrelated page immediately after this function
/// returns. The caller is responsible for re-checking that the slot still
/// contains the page with the same key before using it.
///
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
match cache_key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
Ok(version_idx) => version_idx,
Err(0) => return None,
Err(version_idx) => version_idx - 1,
};
let version = &versions[version_idx];
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
/// Search for a page in the cache using the given search key.
///
/// Like 'search_mapping, but performs an "exact" search. Used for
/// allocating a new buffer.
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
Some(versions[version_idx].slot_idx)
} else {
None
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
@@ -775,27 +519,6 @@ impl PageCache {
///
fn remove_mapping(&self, old_key: &CacheKey) {
match old_key {
CacheKey::MaterializedPage {
hash_key: old_hash_key,
lsn: old_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
let versions = old_entry.get_mut();
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
versions.remove(version_idx);
self.size_metrics
.current_bytes_materialized_page
.sub_page_sz(1);
if versions.is_empty() {
old_entry.remove_entry();
}
}
} else {
panic!("could not find old key in mapping")
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
@@ -812,30 +535,6 @@ impl PageCache {
/// of the existing mapping and leaves it untouched.
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
match new_key {
CacheKey::MaterializedPage {
hash_key: new_key,
lsn: new_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
let versions = map.entry(new_key.clone()).or_default();
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
Ok(version_idx) => Some(versions[version_idx].slot_idx),
Err(version_idx) => {
versions.insert(
version_idx,
Version {
lsn: *new_lsn,
slot_idx,
},
);
self.size_metrics
.current_bytes_materialized_page
.add_page_sz(1);
None
}
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
@@ -949,7 +648,6 @@ impl PageCache {
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
@@ -968,7 +666,6 @@ impl PageCache {
.collect();
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),

View File

@@ -945,9 +945,8 @@ impl PageServerHandler {
b"valid_until",
)]))?
.write_message_noflush(&BeMessage::DataRow(&[Some(
&valid_until.as_millis().to_be_bytes(),
)]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
valid_until.as_millis().to_string().as_bytes(),
)]))?;
Ok(())
}

View File

@@ -18,6 +18,7 @@ use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::SystemExt;
use tokio::fs;
use anyhow::Context;
@@ -483,10 +484,9 @@ pub async fn init_tenant_mgr(
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
// Initialize dynamic limits that depend on system resources
let system_memory = sysinfo::System::new_with_specifics(
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
)
.total_memory();
let system_memory =
sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
.total_memory();
let max_ephemeral_layer_bytes =
conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
tracing::info!("Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory");

View File

@@ -101,9 +101,7 @@ use crate::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
};
use crate::metrics::TimelineMetrics;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::config::TenantConfOpt;
use pageserver_api::reltag::RelTag;
@@ -120,7 +118,6 @@ use utils::{
simple_rcu::{Rcu, RcuReadGuard},
};
use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr;
@@ -134,7 +131,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{config::TenantConf, storage_layer::VectoredValueReconstructState};
use super::config::TenantConf;
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
@@ -887,32 +884,11 @@ impl Timeline {
self.timeline_get_throttle.throttle(ctx, 1).await;
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => {
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
return Ok(cached_img); // exact LSN match, return the image
}
Ordering::Greater => {
unreachable!("the returned lsn should never be after the requested lsn")
}
}
Some((cached_lsn, cached_img))
}
None => None,
};
match self.conf.get_impl {
GetImpl::Legacy => {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
img: None,
};
self.get_impl(key, lsn, reconstruct_state, ctx).await
@@ -926,13 +902,6 @@ impl Timeline {
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
// Only add the cached image to the reconstruct state when it exists.
if cached_page_img.is_some() {
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
}
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
@@ -3240,7 +3209,6 @@ impl Timeline {
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(traversal_path);
}
if let Some(prev) = prev_lsn {
@@ -3614,26 +3582,6 @@ impl Timeline {
})
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
async fn lookup_cached_page(
&self,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
}
async fn get_ready_ancestor_timeline(
&self,
ancestor: &Arc<Timeline>,
@@ -5280,8 +5228,6 @@ impl Timeline {
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
};
let last_rec_lsn = data.records.last().unwrap().0;
let img = match self
.walredo_mgr
.as_ref()
@@ -5295,23 +5241,6 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::WalRedo(e)),
};
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();
if let Err(e) = cache
.memorize_materialized_page(
self.tenant_shard_id,
self.timeline_id,
key,
last_rec_lsn,
&img,
)
.await
.context("Materialized page memoization failed")
{
return Err(PageReconstructError::from(e));
}
}
Ok(img)
}
}

View File

@@ -118,8 +118,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_page_cache_read_hits_total",
"pageserver_page_cache_read_accesses_total",
"pageserver_page_cache_size_current_bytes",

View File

@@ -3446,11 +3446,12 @@ class Endpoint(PgProtocol, LogUtils):
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
# This lock prevents concurrent start & stop operations, keeping `self.running` consistent
# with whether we're really running. Tests generally wouldn't try and do these concurrently,
# but endpoints are also stopped during test teardown, which might happen concurrently with
# destruction of objects in tests.
self.lock = threading.Lock()
# Semaphore is set to 1 when we start, and acquire'd back to zero when we stop
#
# We use a semaphore rather than a bool so that racing calls to stop() don't
# try and stop the same process twice, as stop() is called by test teardown and
# potentially by some __del__ chains in other threads.
self._running = threading.Semaphore(0)
def http_client(
self, auth_token: Optional[str] = None, retries: Optional[Retry] = None
@@ -3522,15 +3523,14 @@ class Endpoint(PgProtocol, LogUtils):
log.info(f"Starting postgres endpoint {self.endpoint_id}")
with self.lock:
self.env.neon_cli.endpoint_start(
self.endpoint_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
self.running = True
self.env.neon_cli.endpoint_start(
self.endpoint_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
self._running.release(1)
return self
@@ -3578,9 +3578,12 @@ class Endpoint(PgProtocol, LogUtils):
conf_file.write("\n".join(hba) + "\n")
conf_file.write(data)
if self.running:
if self.is_running():
self.safe_psql("SELECT pg_reload_conf()")
def is_running(self):
return self._running._value > 0
def reconfigure(self, pageserver_id: Optional[int] = None):
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
@@ -3629,13 +3632,12 @@ class Endpoint(PgProtocol, LogUtils):
Returns self.
"""
with self.lock:
if self.running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
)
self.running = False
running = self._running.acquire(blocking=False)
if running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
)
return self
@@ -3645,13 +3647,13 @@ class Endpoint(PgProtocol, LogUtils):
Returns self.
"""
with self.lock:
running = self._running.acquire(blocking=False)
if running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
)
self.endpoint_id = None
self.running = False
return self

View File

@@ -1,4 +1,5 @@
import os
import queue
import random
import threading
import time
@@ -8,11 +9,7 @@ from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: str):
if build_type == "debug":
# Disable vectored read path cross validation since it makes the test time out.
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
cache_dir = os.path.join(env.repo_dir, "file_cache")
@@ -33,11 +30,10 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
cur = endpoint.connect().cursor()
stop = threading.Event()
n_rows = 100000
n_threads = 20
n_updates_per_thread = 10000
n_updates_per_connection = 1000
n_total_updates = n_threads * n_updates_per_thread
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
@@ -48,11 +44,11 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates():
def run_updates(n_updates_performed_q: queue.Queue[int]):
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
for _ in range(n_updates_per_thread):
while not stop.is_set():
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
@@ -61,19 +57,28 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
n_updates_performed_q.put(n_updates_performed)
n_updates_performed_q: queue.Queue[int] = queue.Queue()
threads: List[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(), daemon=True)
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
thread.start()
threads.append(thread)
time.sleep(5)
# unlink, this is what we're actually testing
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
os.rename(cache_dir, new_cache_dir)
time.sleep(10)
stop.set()
n_updates_performed = 0
for thread in threads:
thread.join()
n_updates_performed += n_updates_performed_q.get()
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed

View File

@@ -88,6 +88,9 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
for ps in env.pageservers:
ps.allowed_errors.append(".*page_service.*error obtaining lsn lease.*Tenant .* not found")
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()

View File

@@ -23,11 +23,11 @@ if TYPE_CHECKING:
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.timeout(600)
@pytest.mark.parametrize("shard_count", [None, 4])
def test_pg_regress(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
build_type: str,
pg_bin: PgBin,
capsys: CaptureFixture[str],
base_dir: Path,
@@ -43,10 +43,6 @@ def test_pg_regress(
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
if build_type == "debug":
# Disable vectored read path cross validation since it makes the test time out.
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)

View File

@@ -6,7 +6,6 @@ from fixtures.neon_fixtures import NeonEnv
def test_physical_replication(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 100000
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
@@ -22,8 +21,20 @@ def test_physical_replication(neon_simple_env: NeonEnv):
with p_con.cursor() as p_cur:
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
for pk in range(n_records):
runtime_secs = 30
started_at = time.time()
pk = 0
while True:
pk += 1
now = time.time()
if now - started_at > runtime_secs:
break
p_cur.execute("insert into t (pk) values (%s)", (pk,))
# an earlier version of this test was based on a fixed number of loop iterations
# and selected for pk=(random.randrange(1, fixed number of loop iterations)).
# => the probability of selection for a value that was never inserted changed from 99.9999% to 0% over the course of the test.
#
# We changed the test to where=(random.randrange(1, 2*pk)), which means the probability is now fixed to 50%.
s_cur.execute(
"select * from t where pk=%s", (random.randrange(1, n_records),)
"select * from t where pk=%s", (random.randrange(1, 2 * pk),)
)

View File

@@ -6,18 +6,24 @@ from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.utils import query_scalar
#
# Create read-only compute nodes, anchored at historical points in time.
#
# This is very similar to the 'test_branch_behind' test, but instead of
# creating branches, creates read-only nodes.
#
def test_readonly_node(neon_simple_env: NeonEnv):
"""
Create read-only compute nodes, anchored at historical points in time.
This is very similar to the 'test_branch_behind' test, but instead of
creating branches, creates read-only nodes.
"""
env = neon_simple_env
env.neon_cli.create_branch("test_readonly_node", "empty")
endpoint_main = env.endpoints.create_start("test_readonly_node")
env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*")
env.pageserver.allowed_errors.extend(
[
".*basebackup .* failed: invalid basebackup lsn.*",
".*page_service.*error obtaining lsn lease.*.*tried to request a page version that was garbage collected",
]
)
main_pg_conn = endpoint_main.connect()
main_cur = main_pg_conn.cursor()