mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 01:50:38 +00:00
Compare commits
13 Commits
heikki/wip
...
yliang/dbg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
402cb2c40a | ||
|
|
fba409a158 | ||
|
|
8baaccb56c | ||
|
|
79401638df | ||
|
|
c789ec21f6 | ||
|
|
558a57b15b | ||
|
|
06f953ad21 | ||
|
|
f0e2bb79b2 | ||
|
|
66d0277571 | ||
|
|
3b2159310f | ||
|
|
ff55d6f4df | ||
|
|
3d1bcc9524 | ||
|
|
ef90a6295e |
@@ -183,8 +183,7 @@ runs:
|
||||
|
||||
# Run the tests.
|
||||
#
|
||||
# The junit.xml file allows CI tools to display more fine-grained test information
|
||||
# in its "Tests" tab in the results page.
|
||||
# --alluredir saves test results in Allure format (in a specified directory)
|
||||
# --verbose prints name of each test (helpful when there are
|
||||
# multiple tests in one file)
|
||||
# -rA prints summary in the end
|
||||
@@ -193,7 +192,6 @@ runs:
|
||||
#
|
||||
mkdir -p $TEST_OUTPUT/allure/results
|
||||
"${cov_prefix[@]}" ./scripts/pytest \
|
||||
--junitxml=$TEST_OUTPUT/junit.xml \
|
||||
--alluredir=$TEST_OUTPUT/allure/results \
|
||||
--tb=short \
|
||||
--verbose \
|
||||
|
||||
9
.github/workflows/actionlint.yml
vendored
9
.github/workflows/actionlint.yml
vendored
@@ -36,15 +36,16 @@ jobs:
|
||||
fail_on_error: true
|
||||
filter_mode: nofilter
|
||||
level: error
|
||||
- run: |
|
||||
|
||||
- name: Disallow 'ubuntu-latest' runners
|
||||
run: |
|
||||
PAT='^\s*runs-on:.*-latest'
|
||||
if grep -ERq $PAT .github/workflows
|
||||
then
|
||||
if grep -ERq $PAT .github/workflows; then
|
||||
grep -ERl $PAT .github/workflows |\
|
||||
while read -r f
|
||||
do
|
||||
l=$(grep -nE $PAT .github/workflows/release.yml | awk -F: '{print $1}' | head -1)
|
||||
echo "::error file=$f,line=$l::Please, do not use ubuntu-latest images to run on, use LTS instead."
|
||||
echo "::error file=$f,line=$l::Please use 'ubuntu-22.04' instead of 'ubuntu-latest'"
|
||||
done
|
||||
exit 1
|
||||
fi
|
||||
|
||||
17
.github/workflows/build_and_test.yml
vendored
17
.github/workflows/build_and_test.yml
vendored
@@ -1023,6 +1023,18 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
# The default value is ~/.docker
|
||||
- name: Set custom docker config directory
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
# `neondatabase/neon` contains multiple binaries, all of them use the same input for the version into the same version formatting library.
|
||||
# Pick pageserver as currently the only binary with extra "version" features printed in the string to verify.
|
||||
# Regular pageserver version string looks like
|
||||
@@ -1057,6 +1069,11 @@ jobs:
|
||||
docker compose -f ./docker-compose/docker-compose.yml logs || 0
|
||||
docker compose -f ./docker-compose/docker-compose.yml down
|
||||
|
||||
- name: Remove custom docker config directory
|
||||
if: always()
|
||||
run: |
|
||||
rm -rf .docker-custom
|
||||
|
||||
promote-images:
|
||||
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
12
.github/workflows/release.yml
vendored
12
.github/workflows/release.yml
vendored
@@ -52,13 +52,15 @@ jobs:
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
TITLE="Storage & Compute release ${RELEASE_DATE}"
|
||||
|
||||
cat << EOF > body.md
|
||||
## Storage & Compute release ${RELEASE_DATE}
|
||||
## ${TITLE}
|
||||
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
EOF
|
||||
|
||||
gh pr create --title "Release ${RELEASE_DATE}" \
|
||||
gh pr create --title "${TITLE}" \
|
||||
--body-file "body.md" \
|
||||
--head "${RELEASE_BRANCH}" \
|
||||
--base "release"
|
||||
@@ -91,13 +93,15 @@ jobs:
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
TITLE="Proxy release ${RELEASE_DATE}"
|
||||
|
||||
cat << EOF > body.md
|
||||
## Proxy release ${RELEASE_DATE}
|
||||
## ${TITLE}
|
||||
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
EOF
|
||||
|
||||
gh pr create --title "Proxy release ${RELEASE_DATE}" \
|
||||
gh pr create --title "${TITLE}" \
|
||||
--body-file "body.md" \
|
||||
--head "${RELEASE_BRANCH}" \
|
||||
--base "release-proxy"
|
||||
|
||||
46
Cargo.lock
generated
46
Cargo.lock
generated
@@ -1056,6 +1056,19 @@ version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "cgroups-rs"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fb3af90c8d48ad5f432d8afb521b5b40c2a2fce46dd60e05912de51c47fba64"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.25.1",
|
||||
"regex",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.38"
|
||||
@@ -2864,9 +2877,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.155"
|
||||
version = "0.2.150"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
|
||||
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
@@ -3126,6 +3139,18 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.25.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bitflags 1.3.2",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.26.4"
|
||||
@@ -4468,9 +4493,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.10.0"
|
||||
version = "1.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
|
||||
checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
|
||||
dependencies = [
|
||||
"either",
|
||||
"rayon-core",
|
||||
@@ -4478,12 +4503,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.12.1"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
|
||||
checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
"num_cpus",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5901,9 +5928,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sysinfo"
|
||||
version = "0.30.12"
|
||||
version = "0.29.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae"
|
||||
checksum = "165d6d8539689e3d3bc8b98ac59541e1f21c7de7c85d60dc80e43ae0ed2113db"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"core-foundation-sys",
|
||||
@@ -5911,7 +5938,7 @@ dependencies = [
|
||||
"ntapi",
|
||||
"once_cell",
|
||||
"rayon",
|
||||
"windows 0.52.0",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6784,6 +6811,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"cgroups-rs",
|
||||
"clap",
|
||||
"futures",
|
||||
"inotify 0.10.2",
|
||||
|
||||
@@ -144,7 +144,7 @@ rustls = "0.22"
|
||||
rustls-pemfile = "2"
|
||||
rustls-split = "0.3"
|
||||
scopeguard = "1.1"
|
||||
sysinfo = "0.30.12"
|
||||
sysinfo = "0.29.2"
|
||||
sd-notify = "0.4.1"
|
||||
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
||||
@@ -45,6 +45,7 @@ use std::{thread, time::Duration};
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Arg;
|
||||
use compute_tools::lsn_lease::launch_lsn_lease_loop_for_static;
|
||||
use signal_hook::consts::{SIGQUIT, SIGTERM};
|
||||
use signal_hook::{consts::SIGINT, iterator::Signals};
|
||||
use tracing::{error, info, warn};
|
||||
@@ -363,6 +364,8 @@ fn wait_spec(
|
||||
state.start_time = now;
|
||||
}
|
||||
|
||||
launch_lsn_lease_loop_for_static(&compute);
|
||||
|
||||
Ok(WaitSpecResult {
|
||||
compute,
|
||||
http_port,
|
||||
|
||||
@@ -11,6 +11,7 @@ pub mod logger;
|
||||
pub mod catalog;
|
||||
pub mod compute;
|
||||
pub mod extension_server;
|
||||
pub mod lsn_lease;
|
||||
pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
|
||||
113
compute_tools/src/lsn_lease.rs
Normal file
113
compute_tools/src/lsn_lease.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
use anyhow::bail;
|
||||
use anyhow::Result;
|
||||
use postgres::{NoTls, SimpleQueryMessage};
|
||||
use std::{
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
thread,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use compute_api::spec::ComputeMode;
|
||||
use tracing::{error, info};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::compute::{ComputeNode, ComputeState};
|
||||
|
||||
pub fn launch_lsn_lease_loop_for_static(compute: &Arc<ComputeNode>) {
|
||||
let lsn = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let spec = state.pspec.as_ref().expect("spec must be set");
|
||||
match spec.spec.mode {
|
||||
ComputeMode::Static(lsn) => lsn,
|
||||
_ => return,
|
||||
}
|
||||
};
|
||||
let compute = compute.clone();
|
||||
thread::spawn(move || lsn_lease_loop(compute, lsn));
|
||||
}
|
||||
|
||||
fn postgres_configs_from_state(compute_state: &ComputeState) -> Vec<postgres::Config> {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let conn_strings = spec.pageserver_connstr.split(',');
|
||||
|
||||
conn_strings
|
||||
.map(|connstr| {
|
||||
let mut config = postgres::Config::from_str(connstr).expect("invalid connstr");
|
||||
if let Some(storage_auth_token) = &spec.storage_auth_token {
|
||||
info!("Got storage auth token from spec file");
|
||||
config.password(storage_auth_token.clone());
|
||||
} else {
|
||||
info!("Storage auth token not set");
|
||||
}
|
||||
config
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn lsn_lease_loop(compute: Arc<ComputeNode>, lsn: Lsn) {
|
||||
loop {
|
||||
let state = compute.state.lock().unwrap();
|
||||
|
||||
let spec = state.pspec.as_ref().expect("spec must be set");
|
||||
let configs = postgres_configs_from_state(&state);
|
||||
|
||||
let cmd = format!("lease lsn {} {} {} ", spec.tenant_id, spec.timeline_id, lsn);
|
||||
drop(state);
|
||||
|
||||
match lsn_lease_request(&configs, &cmd) {
|
||||
Ok(valid_until) => {
|
||||
let valid_until_duration = Duration::from_millis(valid_until as u64);
|
||||
|
||||
let sleep_duration = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or(Duration::ZERO)
|
||||
.checked_sub(valid_until_duration)
|
||||
.unwrap_or(Duration::ZERO)
|
||||
.checked_sub(Duration::from_secs(60))
|
||||
.unwrap_or(Duration::ZERO);
|
||||
|
||||
// Ensure the sleep duration is at least 60 seconds to avoid busy loops
|
||||
let sleep_duration = std::cmp::max(sleep_duration, Duration::from_secs(60));
|
||||
|
||||
info!(
|
||||
"lsn_lease_request succeeded, sleeping for {} seconds",
|
||||
sleep_duration.as_secs()
|
||||
);
|
||||
thread::sleep(sleep_duration);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("lsn_lease_request failed: {:#}", e);
|
||||
thread::sleep(Duration::from_secs(10));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn lsn_lease_request(configs: &[postgres::Config], cmd: &str) -> Result<u128> {
|
||||
info!("lsn_lease_request: {}", cmd);
|
||||
let valid_until = configs
|
||||
.iter()
|
||||
.map(|config| {
|
||||
let mut client = config.connect(NoTls)?;
|
||||
let res = client.simple_query(cmd)?;
|
||||
let msg = match res.first() {
|
||||
Some(msg) => msg,
|
||||
None => bail!("empty response"),
|
||||
};
|
||||
let row = match msg {
|
||||
SimpleQueryMessage::Row(row) => row,
|
||||
_ => bail!("error parsing lsn lease response"),
|
||||
};
|
||||
let valid_until_str = match row.get("valid_until") {
|
||||
Some(valid_until) => valid_until,
|
||||
None => bail!("valid_until not found"),
|
||||
};
|
||||
Ok(u128::from_str(valid_until_str)?)
|
||||
})
|
||||
.collect::<Result<Vec<u128>>>()?
|
||||
.into_iter()
|
||||
.min()
|
||||
.unwrap();
|
||||
Ok(valid_until)
|
||||
}
|
||||
@@ -5,4 +5,3 @@ TODO:
|
||||
- shared across tenants
|
||||
- store pages from layer files
|
||||
- store pages from "in-memory layer"
|
||||
- store materialized pages
|
||||
|
||||
@@ -134,7 +134,7 @@ depends on that, so if you change it, bad things will happen.
|
||||
|
||||
#### page_cache_size
|
||||
|
||||
Size of the page cache, to hold materialized page versions. Unit is
|
||||
Size of the page cache. Unit is
|
||||
number of 8 kB blocks. The default is 8192, which means 64 MB.
|
||||
|
||||
#### max_file_descriptors
|
||||
|
||||
@@ -25,3 +25,6 @@ tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
cgroups-rs = "0.3.3"
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
// The monitor was previously started by vm-builder, and for testing purposes,
|
||||
// we can mimic that setup with this binary.
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
use clap::Parser;
|
||||
@@ -25,3 +26,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
let token = CancellationToken::new();
|
||||
vm_monitor::start(args, token).await
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn main() {
|
||||
panic!("the monitor requires cgroups, which are only available on linux")
|
||||
}
|
||||
|
||||
363
libs/vm_monitor/src/cgroup.rs
Normal file
363
libs/vm_monitor/src/cgroup.rs
Normal file
@@ -0,0 +1,363 @@
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use cgroups_rs::{
|
||||
hierarchies::{self, is_cgroup2_unified_mode},
|
||||
memory::MemController,
|
||||
Subsystem,
|
||||
};
|
||||
use tokio::sync::watch;
|
||||
use tracing::{info, warn};
|
||||
|
||||
/// Configuration for a `CgroupWatcher`
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
/// Interval at which we should be fetching memory statistics
|
||||
memory_poll_interval: Duration,
|
||||
|
||||
/// The number of samples used in constructing aggregated memory statistics
|
||||
memory_history_len: usize,
|
||||
/// The number of most recent samples that will be periodically logged.
|
||||
///
|
||||
/// Each sample is logged exactly once. Increasing this value means that recent samples will be
|
||||
/// logged less frequently, and vice versa.
|
||||
///
|
||||
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
|
||||
memory_history_log_interval: usize,
|
||||
/// The max number of iterations to skip before logging the next iteration
|
||||
memory_history_log_noskip_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
memory_poll_interval: Duration::from_millis(100),
|
||||
memory_history_len: 5, // use 500ms of history for decision-making
|
||||
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
|
||||
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
|
||||
/// OOM killed or throttling.
|
||||
///
|
||||
/// The `CgroupWatcher` primarily achieves this by reading from a stream of
|
||||
/// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
|
||||
/// cgroup happy.
|
||||
#[derive(Debug)]
|
||||
pub struct CgroupWatcher {
|
||||
pub config: Config,
|
||||
|
||||
/// The actual cgroup we are watching and managing.
|
||||
cgroup: cgroups_rs::Cgroup,
|
||||
}
|
||||
|
||||
impl CgroupWatcher {
|
||||
/// Create a new `CgroupWatcher`.
|
||||
#[tracing::instrument(skip_all, fields(%name))]
|
||||
pub fn new(name: String) -> anyhow::Result<Self> {
|
||||
// TODO: clarify exactly why we need v2
|
||||
// Make sure cgroups v2 (aka unified) are supported
|
||||
if !is_cgroup2_unified_mode() {
|
||||
anyhow::bail!("cgroups v2 not supported");
|
||||
}
|
||||
let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
|
||||
|
||||
Ok(Self {
|
||||
cgroup,
|
||||
config: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// The entrypoint for the `CgroupWatcher`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn watch(
|
||||
&self,
|
||||
updates: watch::Sender<(Instant, MemoryHistory)>,
|
||||
) -> anyhow::Result<()> {
|
||||
// this requirement makes the code a bit easier to work with; see the config for more.
|
||||
assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
|
||||
|
||||
let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
// ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
|
||||
|
||||
let mem_controller = self.memory()?;
|
||||
|
||||
// buffer for samples that will be logged. once full, it remains so.
|
||||
let history_log_len = self.config.memory_history_log_interval;
|
||||
let max_skip = self.config.memory_history_log_noskip_interval;
|
||||
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
|
||||
let mut last_logged_memusage = MemoryStatus::zeroed();
|
||||
|
||||
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
|
||||
let mut can_skip_logs_until = Instant::now() - max_skip;
|
||||
|
||||
for t in 0_u64.. {
|
||||
ticker.tick().await;
|
||||
|
||||
let now = Instant::now();
|
||||
let mem = Self::memory_usage(mem_controller);
|
||||
|
||||
let i = t as usize % history_log_len;
|
||||
history_log_buf[i] = mem;
|
||||
|
||||
// We're taking *at most* memory_history_len values; we may be bounded by the total
|
||||
// number of samples that have come in so far.
|
||||
let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
|
||||
// NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
|
||||
// that we just inserted a value there, so the end of the iterator will *include* the
|
||||
// value at i, rather than stopping just short of it.
|
||||
let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
|
||||
|
||||
let summary = MemoryHistory {
|
||||
avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
|
||||
/ samples_count as u64,
|
||||
samples_count,
|
||||
samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
|
||||
};
|
||||
|
||||
// Log the current history if it's time to do so. Because `history_log_buf` has length
|
||||
// equal to the logging interval, we can just log the entire buffer every time we set
|
||||
// the last entry, which also means that for this log line, we can ignore that it's a
|
||||
// ring buffer (because all the entries are in order of increasing time).
|
||||
//
|
||||
// We skip logging the data if data hasn't meaningfully changed in a while, unless
|
||||
// we've already ignored previous iterations for the last max_skip period.
|
||||
if i == history_log_len - 1
|
||||
&& (now > can_skip_logs_until
|
||||
|| !history_log_buf
|
||||
.iter()
|
||||
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
|
||||
{
|
||||
info!(
|
||||
history = ?MemoryStatus::debug_slice(&history_log_buf),
|
||||
summary = ?summary,
|
||||
"Recent cgroup memory statistics history"
|
||||
);
|
||||
|
||||
can_skip_logs_until = now + max_skip;
|
||||
|
||||
last_logged_memusage = *history_log_buf.last().unwrap();
|
||||
}
|
||||
|
||||
updates
|
||||
.send((now, summary))
|
||||
.context("failed to send MemoryHistory")?;
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Get a handle on the memory subsystem.
|
||||
fn memory(&self) -> anyhow::Result<&MemController> {
|
||||
self.cgroup
|
||||
.subsystems()
|
||||
.iter()
|
||||
.find_map(|sub| match sub {
|
||||
Subsystem::Mem(c) => Some(c),
|
||||
_ => None,
|
||||
})
|
||||
.ok_or_else(|| anyhow!("could not find memory subsystem"))
|
||||
}
|
||||
|
||||
/// Given a handle on the memory subsystem, returns the current memory information
|
||||
fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
|
||||
let stat = mem_controller.memory_stat().stat;
|
||||
MemoryStatus {
|
||||
non_reclaimable: stat.active_anon + stat.inactive_anon,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function for `CgroupWatcher::watch`
|
||||
fn ring_buf_recent_values_iter<T>(
|
||||
buf: &[T],
|
||||
last_value_idx: usize,
|
||||
count: usize,
|
||||
) -> impl '_ + Iterator<Item = &T> {
|
||||
// Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
|
||||
// easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
|
||||
assert!(count <= buf.len());
|
||||
|
||||
buf.iter()
|
||||
// 'cycle' because the values could wrap around
|
||||
.cycle()
|
||||
// with 'cycle', this skip is more like 'offset', and functionally this is
|
||||
// offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
|
||||
// careful to avoid underflow, so we pre-add buf.len().
|
||||
// The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
|
||||
.skip((buf.len() + last_value_idx + 1 - count) % buf.len())
|
||||
.take(count)
|
||||
}
|
||||
|
||||
/// Summary of recent memory usage
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct MemoryHistory {
|
||||
/// Rolling average of non-reclaimable memory usage samples over the last `history_period`
|
||||
pub avg_non_reclaimable: u64,
|
||||
|
||||
/// The number of samples used to construct this summary
|
||||
pub samples_count: usize,
|
||||
/// Total timespan between the first and last sample used for this summary
|
||||
pub samples_span: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct MemoryStatus {
|
||||
non_reclaimable: u64,
|
||||
}
|
||||
|
||||
impl MemoryStatus {
|
||||
fn zeroed() -> Self {
|
||||
MemoryStatus { non_reclaimable: 0 }
|
||||
}
|
||||
|
||||
fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
|
||||
struct DS<'a>(&'a [MemoryStatus]);
|
||||
|
||||
impl<'a> Debug for DS<'a> {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
f.debug_struct("[MemoryStatus]")
|
||||
.field(
|
||||
"non_reclaimable[..]",
|
||||
&Fields(self.0, |stat: &MemoryStatus| {
|
||||
BytesToGB(stat.non_reclaimable)
|
||||
}),
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
struct Fields<'a, F>(&'a [MemoryStatus], F);
|
||||
|
||||
impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
f.debug_list().entries(self.0.iter().map(&self.1)).finish()
|
||||
}
|
||||
}
|
||||
|
||||
struct BytesToGB(u64);
|
||||
|
||||
impl Debug for BytesToGB {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
f.write_fmt(format_args!(
|
||||
"{:.3}Gi",
|
||||
self.0 as f64 / (1_u64 << 30) as f64
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
DS(slice)
|
||||
}
|
||||
|
||||
/// Check if the other memory status is a close or similar result.
|
||||
/// Returns true if the larger value is not larger than the smaller value
|
||||
/// by 1/8 of the smaller value, and within 128MiB.
|
||||
/// See tests::check_similarity_behaviour for examples of behaviour
|
||||
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
|
||||
let margin;
|
||||
let diff;
|
||||
if self.non_reclaimable >= other.non_reclaimable {
|
||||
margin = other.non_reclaimable / 8;
|
||||
diff = self.non_reclaimable - other.non_reclaimable;
|
||||
} else {
|
||||
margin = self.non_reclaimable / 8;
|
||||
diff = other.non_reclaimable - self.non_reclaimable;
|
||||
}
|
||||
|
||||
diff < margin && diff < 128 * 1024 * 1024
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn ring_buf_iter() {
|
||||
let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
|
||||
|
||||
let values = |offset, count| {
|
||||
super::ring_buf_recent_values_iter(&buf, offset, count)
|
||||
.copied()
|
||||
.collect::<Vec<i32>>()
|
||||
};
|
||||
|
||||
// Boundary conditions: start, end, and entire thing:
|
||||
assert_eq!(values(0, 1), [0]);
|
||||
assert_eq!(values(3, 4), [0, 1, 2, 3]);
|
||||
assert_eq!(values(9, 4), [6, 7, 8, 9]);
|
||||
assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
|
||||
// "normal" operation: no wraparound
|
||||
assert_eq!(values(7, 4), [4, 5, 6, 7]);
|
||||
|
||||
// wraparound:
|
||||
assert_eq!(values(0, 4), [7, 8, 9, 0]);
|
||||
assert_eq!(values(1, 4), [8, 9, 0, 1]);
|
||||
assert_eq!(values(2, 4), [9, 0, 1, 2]);
|
||||
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_similarity_behaviour() {
|
||||
// This all accesses private methods, so we can't actually run this
|
||||
// as doctests, because doctests run as an external crate.
|
||||
let mut small = super::MemoryStatus {
|
||||
non_reclaimable: 1024,
|
||||
};
|
||||
let mut large = super::MemoryStatus {
|
||||
non_reclaimable: 1024 * 1024 * 1024 * 1024,
|
||||
};
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// inequality is symmetric
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
small.non_reclaimable = 64;
|
||||
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// values are similar if the larger value is larger by less than
|
||||
// 12.5%, i.e. 1/8 of the smaller value.
|
||||
// In the example above, large is exactly 12.5% larger, so this doesn't
|
||||
// match.
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
|
||||
// The 1/8 rule only applies up to 128MiB of difference
|
||||
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
|
||||
large.non_reclaimable = small.non_reclaimable / 8 * 9;
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// the large value is put just above the threshold
|
||||
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// now below
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,9 @@
|
||||
//!
|
||||
//! Contains types that manage the interaction (not data interchange, see `protocol`)
|
||||
//! between agent and monitor, allowing us to to process and send messages in a
|
||||
//! straightforward way.
|
||||
//! straightforward way. The dispatcher also manages that signals that come from
|
||||
//! the cgroup (requesting upscale), and the signals that go to the cgroup
|
||||
//! (notifying it of upscale).
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
@@ -19,7 +21,11 @@ use crate::protocol::{
|
||||
|
||||
/// The central handler for all communications in the monitor.
|
||||
///
|
||||
/// The dispatcher manages the connection to the agent, sending and receiving messages.
|
||||
/// The dispatcher has two purposes:
|
||||
/// 1. Manage the connection to the agent, sending and receiving messages.
|
||||
/// 2. Communicate with the cgroup manager, notifying it when upscale is received,
|
||||
/// and sending a message to the agent when the cgroup manager requests
|
||||
/// upscale.
|
||||
#[derive(Debug)]
|
||||
pub struct Dispatcher {
|
||||
/// We read agent messages of of `source`
|
||||
|
||||
@@ -11,6 +11,7 @@ use axum::{routing::get, Router, Server};
|
||||
use clap::Parser;
|
||||
use futures::Future;
|
||||
use std::{fmt::Debug, time::Duration};
|
||||
use sysinfo::{RefreshKind, System, SystemExt};
|
||||
use tokio::{sync::broadcast, task::JoinHandle};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
@@ -21,18 +22,18 @@ use runner::Runner;
|
||||
pub mod dispatcher;
|
||||
pub mod protocol;
|
||||
|
||||
pub mod cgroup;
|
||||
pub mod filecache;
|
||||
pub mod runner;
|
||||
|
||||
pub mod sliding_window;
|
||||
|
||||
/// The vm-monitor is an autoscaling component started by compute_ctl.
|
||||
///
|
||||
/// It carries out autoscaling decisions (upscaling/downscaling) and responds to
|
||||
/// memory pressure by making requests to the autoscaler-agent.
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Args {
|
||||
/// Unused but accepted for backwards compatibility.
|
||||
/// The name of the cgroup we should monitor for memory.high events. This
|
||||
/// is the cgroup that postgres should be running in.
|
||||
#[arg(short, long)]
|
||||
pub cgroup: Option<String>,
|
||||
|
||||
@@ -62,6 +63,10 @@ pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
|
||||
(bytes as f32) / (MiB as f32)
|
||||
}
|
||||
|
||||
pub fn get_total_system_memory() -> u64 {
|
||||
System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
|
||||
}
|
||||
|
||||
/// Global app state for the Axum server
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServerState {
|
||||
@@ -82,7 +87,8 @@ pub struct ServerState {
|
||||
///
|
||||
/// This is mainly meant to be called with futures that will be pending for a very
|
||||
/// long time, or are not mean to return. If it is not desirable for the future to
|
||||
/// ever resolve, the error can be logged with `f`.
|
||||
/// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
|
||||
/// be logged with `f`.
|
||||
pub fn spawn_with_cancel<T, F>(
|
||||
token: CancellationToken,
|
||||
f: F,
|
||||
|
||||
@@ -59,15 +59,16 @@ pub enum OutboundMsgKind {
|
||||
/// properly deserialize it.
|
||||
InvalidMessage { error: String },
|
||||
/// Indicates that we experienced an internal error while processing a message.
|
||||
/// For example, if enlarging the file cache fails while trying to handle an upscale,
|
||||
/// For example, if a cgroup operation fails while trying to handle an upscale,
|
||||
/// we return `InternalError`.
|
||||
InternalError { error: String },
|
||||
/// Returned to the agent once we have finished handling an upscale. If the
|
||||
/// handling was unsuccessful, an `InternalError` will get returned instead.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
UpscaleConfirmation {},
|
||||
/// Indicates to the monitor that we are requesting the VM to be scaled to this size.
|
||||
ScaleRequest { target: Resources },
|
||||
/// Indicates to the monitor that we are urgently requesting resources.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
UpscaleRequest {},
|
||||
/// Returned to the agent once we have finished attempting to downscale. If
|
||||
/// an error occured trying to do so, an `InternalError` will get returned instead.
|
||||
/// However, if we are simply unsuccessful (for example, do to needing the resources),
|
||||
@@ -125,7 +126,7 @@ pub enum InboundMsgKind {
|
||||
}
|
||||
|
||||
/// Represents the resources granted to a VM.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||
// Renamed because the agent has multiple resources types:
|
||||
// `Resources` (milliCPU/memory slots)
|
||||
// `Allocation` (vCPU/bytes) <- what we correspond to
|
||||
|
||||
@@ -10,23 +10,23 @@ use std::time::{Duration, Instant};
|
||||
use anyhow::{bail, Context};
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
use futures::StreamExt;
|
||||
use sysinfo::System;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::cgroup::{self, CgroupWatcher};
|
||||
use crate::dispatcher::Dispatcher;
|
||||
use crate::filecache::{FileCacheConfig, FileCacheState};
|
||||
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
|
||||
use crate::sliding_window::SlidingMax;
|
||||
use crate::{bytes_to_mebibytes, Args, MiB};
|
||||
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
|
||||
|
||||
/// Central struct that tracks the desired scaling target, and interacts with the agent
|
||||
/// and dispatcher to handle signals from the agent.
|
||||
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
|
||||
/// signals from the agent.
|
||||
#[derive(Debug)]
|
||||
pub struct Runner {
|
||||
config: Config,
|
||||
filecache: Option<FileCacheState>,
|
||||
cgroup: Option<CgroupState>,
|
||||
dispatcher: Dispatcher,
|
||||
|
||||
/// We "mint" new message ids by incrementing this counter and taking the value.
|
||||
@@ -35,12 +35,7 @@ pub struct Runner {
|
||||
/// by us vs the autoscaler-agent.
|
||||
counter: usize,
|
||||
|
||||
last_scale_request: Option<(Resources, Instant)>,
|
||||
|
||||
cpu_window: SlidingMax<f64, Instant>,
|
||||
mem_window: SlidingMax<u64, Instant>,
|
||||
|
||||
system: System,
|
||||
last_upscale_request_at: Option<Instant>,
|
||||
|
||||
/// A signal to kill the main thread produced by `self.run()`. This is triggered
|
||||
/// when the server receives a new connection. When the thread receives the
|
||||
@@ -48,6 +43,14 @@ pub struct Runner {
|
||||
kill: broadcast::Receiver<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct CgroupState {
|
||||
watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
|
||||
/// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
|
||||
/// requests.
|
||||
threshold: u64,
|
||||
}
|
||||
|
||||
/// Configuration for a `Runner`
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
@@ -66,45 +69,55 @@ pub struct Config {
|
||||
/// should be removed once we have a better solution there.
|
||||
sys_buffer_bytes: u64,
|
||||
|
||||
/// Interval at which we poll memory and CPU statistics for scaling decisions.
|
||||
poll_interval: Duration,
|
||||
/// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
|
||||
/// other words, providing a ceiling for the highest value of the threshold by enforcing that
|
||||
/// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
|
||||
/// threshold.
|
||||
///
|
||||
/// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
|
||||
/// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
|
||||
/// memory.
|
||||
///
|
||||
/// The default value of `0.15` means that we *guarantee* sending upscale requests if the
|
||||
/// cgroup is using more than 85% of total memory (even if we're *not* separately reserving
|
||||
/// memory for the file cache).
|
||||
cgroup_min_overhead_fraction: f64,
|
||||
|
||||
/// The resources requested from the agent are calculated based on the Max of memory
|
||||
/// usage and load average over a sliding window of the last X seconds. This controls
|
||||
/// the length of the window to consider.
|
||||
sliding_window_length: Duration,
|
||||
|
||||
/// Desired fraction of current CPU that the load average should be. For example, with a value
|
||||
/// of 0.7, we'd want load average to sit at 0.7 × CPU, scaling CPU to make this happen.
|
||||
load_average_fraction_target: f64,
|
||||
|
||||
/// Desired fraction of current memory that we would like to be using. For example, with a value
|
||||
/// of 0.7, on a 4GB VM we'd like to be using 2.8GB of memory.
|
||||
memory_usage_fraction_target: f64,
|
||||
|
||||
/// When requesting scaling to a certain # of CPUs, the request is rounded up to the
|
||||
/// nearest multiple of 'cpu_quantum'. For example, if the desired # of CPUs based on the
|
||||
/// usage is 3.1, and cpu_quantum is 0.25, we'd request 3.25 CPUs.
|
||||
cpu_quantum: f64,
|
||||
|
||||
/// Like 'cpu_quantum', but for memory. In bytes.
|
||||
mem_quantum: u64,
|
||||
cgroup_downscale_threshold_buffer_bytes: u64,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sys_buffer_bytes: 100 * MiB,
|
||||
poll_interval: Duration::from_millis(100),
|
||||
sliding_window_length: Duration::from_secs(60),
|
||||
cpu_quantum: 0.25,
|
||||
mem_quantum: 512 * 1024 * 1024,
|
||||
load_average_fraction_target: 0.9,
|
||||
memory_usage_fraction_target: 0.75,
|
||||
cgroup_min_overhead_fraction: 0.15,
|
||||
cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 {
|
||||
// If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup,
|
||||
// and thus be non-reclaimable, so we should allow for additional memory usage.
|
||||
//
|
||||
// If the file cache sits on disk, our desired stable system state is for it to be fully
|
||||
// page cached (its contents should only be paged to/from disk in situations where we can't
|
||||
// upscale fast enough). Page-cached memory is reclaimable, so we need to lower the
|
||||
// threshold for non-reclaimable memory so we scale up *before* the kernel starts paging
|
||||
// out the file cache.
|
||||
let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size);
|
||||
|
||||
// Even if we're not separately making room for the file cache (if it's in tmpfs), we still
|
||||
// want our threshold to be met gracefully instead of letting postgres get OOM-killed.
|
||||
// So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
|
||||
// remaining above the threshold.
|
||||
let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64;
|
||||
|
||||
memory_remaining_for_cgroup.min(max_threshold)
|
||||
}
|
||||
}
|
||||
|
||||
impl Runner {
|
||||
/// Create a new monitor.
|
||||
#[tracing::instrument(skip_all, fields(?config, ?args))]
|
||||
@@ -124,25 +137,23 @@ impl Runner {
|
||||
.await
|
||||
.context("error creating new dispatcher")?;
|
||||
|
||||
let now = Instant::now();
|
||||
let mut state = Runner {
|
||||
config,
|
||||
filecache: None,
|
||||
cgroup: None,
|
||||
dispatcher,
|
||||
counter: 1, // NB: must be odd, see the comment about the field for more.
|
||||
last_scale_request: None,
|
||||
cpu_window: SlidingMax::new(0.0, now),
|
||||
mem_window: SlidingMax::new(0, now),
|
||||
last_upscale_request_at: None,
|
||||
kill,
|
||||
system: System::new(),
|
||||
};
|
||||
|
||||
state.system.refresh_specifics(
|
||||
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
|
||||
);
|
||||
let mem = get_total_system_memory();
|
||||
|
||||
let mem = state.system.total_memory();
|
||||
let mut file_cache_disk_size = 0;
|
||||
|
||||
// We need to process file cache initialization before cgroup initialization, so that the memory
|
||||
// allocated to the file cache is appropriately taken into account when we decide the cgroup's
|
||||
// memory limits.
|
||||
if let Some(connstr) = &args.pgconnstr {
|
||||
info!("initializing file cache");
|
||||
let config = FileCacheConfig::default();
|
||||
@@ -173,19 +184,51 @@ impl Runner {
|
||||
info!("file cache size actually got set to {actual_size}")
|
||||
}
|
||||
|
||||
file_cache_disk_size = actual_size;
|
||||
state.filecache = Some(file_cache);
|
||||
}
|
||||
|
||||
if let Some(name) = &args.cgroup {
|
||||
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
|
||||
// now, and then set limits later.
|
||||
info!("initializing cgroup");
|
||||
|
||||
let cgroup =
|
||||
CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
|
||||
|
||||
let init_value = cgroup::MemoryHistory {
|
||||
avg_non_reclaimable: 0,
|
||||
samples_count: 0,
|
||||
samples_span: Duration::ZERO,
|
||||
};
|
||||
let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
|
||||
|
||||
spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
|
||||
cgroup.watch(hist_tx).await
|
||||
});
|
||||
|
||||
let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size);
|
||||
info!(threshold, "set initial cgroup threshold",);
|
||||
|
||||
state.cgroup = Some(CgroupState {
|
||||
watcher: hist_rx,
|
||||
threshold,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
/// Attempt to downscale filecache
|
||||
/// Attempt to downscale filecache + cgroup
|
||||
#[tracing::instrument(skip_all, fields(?target))]
|
||||
pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
|
||||
// Nothing to adjust
|
||||
if self.filecache.is_none() {
|
||||
info!("no action needed for downscale (no file cache enabled)");
|
||||
return Ok((true, "monitor is not managing file cache".to_string()));
|
||||
if self.cgroup.is_none() && self.filecache.is_none() {
|
||||
info!("no action needed for downscale (no cgroup or file cache enabled)");
|
||||
return Ok((
|
||||
true,
|
||||
"monitor is not managing cgroup or file cache".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let requested_mem = target.mem;
|
||||
@@ -195,14 +238,57 @@ impl Runner {
|
||||
.as_ref()
|
||||
.map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
|
||||
.unwrap_or(0);
|
||||
if let Some(cgroup) = &self.cgroup {
|
||||
let (last_time, last_history) = *cgroup.watcher.borrow();
|
||||
|
||||
// The downscaling has been approved. Downscale the file cache.
|
||||
// NB: The ordering of these conditions is intentional. During startup, we should deny
|
||||
// downscaling until we have enough information to determine that it's safe to do so
|
||||
// (i.e. enough samples have come in). But if it's been a while and we *still* haven't
|
||||
// received any information, we should *fail* instead of just denying downscaling.
|
||||
//
|
||||
// `last_time` is set to `Instant::now()` on startup, so checking `last_time.elapsed()`
|
||||
// serves double-duty: it trips if we haven't received *any* metrics for long enough,
|
||||
// OR if we haven't received metrics *recently enough*.
|
||||
//
|
||||
// TODO: make the duration here configurable.
|
||||
if last_time.elapsed() > Duration::from_secs(5) {
|
||||
bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
|
||||
} else if last_history.samples_count <= 1 {
|
||||
let status = "haven't received enough cgroup memory stats yet";
|
||||
info!(status, "discontinuing downscale");
|
||||
return Ok((false, status.to_owned()));
|
||||
}
|
||||
|
||||
let new_threshold = self
|
||||
.config
|
||||
.cgroup_threshold(usable_system_memory, expected_file_cache_size);
|
||||
|
||||
let current = last_history.avg_non_reclaimable;
|
||||
|
||||
if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
|
||||
let status = format!(
|
||||
"{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
|
||||
"calculated memory threshold too low",
|
||||
bytes_to_mebibytes(new_threshold),
|
||||
bytes_to_mebibytes(current),
|
||||
bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
|
||||
);
|
||||
|
||||
info!(status, "discontinuing downscale");
|
||||
|
||||
return Ok((false, status));
|
||||
}
|
||||
}
|
||||
|
||||
// The downscaling has been approved. Downscale the file cache, then the cgroup.
|
||||
let mut status = vec![];
|
||||
let mut file_cache_disk_size = 0;
|
||||
if let Some(file_cache) = &mut self.filecache {
|
||||
let actual_usage = file_cache
|
||||
.set_file_cache_size(expected_file_cache_size)
|
||||
.await
|
||||
.context("failed to set file cache size")?;
|
||||
file_cache_disk_size = actual_usage;
|
||||
let message = format!(
|
||||
"set file cache size to {} MiB",
|
||||
bytes_to_mebibytes(actual_usage),
|
||||
@@ -211,6 +297,22 @@ impl Runner {
|
||||
status.push(message);
|
||||
}
|
||||
|
||||
if let Some(cgroup) = &mut self.cgroup {
|
||||
let new_threshold = self
|
||||
.config
|
||||
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
|
||||
|
||||
let message = format!(
|
||||
"set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
|
||||
bytes_to_mebibytes(cgroup.threshold),
|
||||
bytes_to_mebibytes(new_threshold),
|
||||
bytes_to_mebibytes(usable_system_memory)
|
||||
);
|
||||
cgroup.threshold = new_threshold;
|
||||
info!("downscale: {message}");
|
||||
status.push(message);
|
||||
}
|
||||
|
||||
// TODO: make this status thing less jank
|
||||
let status = status.join("; ");
|
||||
Ok((true, status))
|
||||
@@ -219,14 +321,15 @@ impl Runner {
|
||||
/// Handle new resources
|
||||
#[tracing::instrument(skip_all, fields(?resources))]
|
||||
pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
|
||||
if self.filecache.is_none() {
|
||||
info!("no action needed for upscale (file cache is disabled)");
|
||||
if self.filecache.is_none() && self.cgroup.is_none() {
|
||||
info!("no action needed for upscale (no cgroup or file cache enabled)");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let new_mem = resources.mem;
|
||||
let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
|
||||
|
||||
let mut file_cache_disk_size = 0;
|
||||
if let Some(file_cache) = &mut self.filecache {
|
||||
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
|
||||
info!(
|
||||
@@ -239,6 +342,7 @@ impl Runner {
|
||||
.set_file_cache_size(expected_usage)
|
||||
.await
|
||||
.context("failed to set file cache size")?;
|
||||
file_cache_disk_size = actual_usage;
|
||||
|
||||
if actual_usage != expected_usage {
|
||||
warn!(
|
||||
@@ -249,6 +353,20 @@ impl Runner {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(cgroup) = &mut self.cgroup {
|
||||
let new_threshold = self
|
||||
.config
|
||||
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
|
||||
|
||||
info!(
|
||||
"set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
|
||||
bytes_to_mebibytes(cgroup.threshold),
|
||||
bytes_to_mebibytes(new_threshold),
|
||||
bytes_to_mebibytes(usable_system_memory)
|
||||
);
|
||||
cgroup.threshold = new_threshold;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -295,57 +413,10 @@ impl Runner {
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the desired size of the VM, based on the CPU and memory usage right now.
|
||||
pub fn calculate_raw_target(&mut self) -> Resources {
|
||||
self.system.refresh_specifics(
|
||||
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
|
||||
);
|
||||
|
||||
// For CPU:
|
||||
//
|
||||
// We use the 1 minute load average to measure "current" CPU usage. Target # of
|
||||
// CPUs is at the point where:
|
||||
//
|
||||
// (CPUs) * (LoadAverageFractionTarget) == (load average).
|
||||
let load_avg_1min = System::load_average().one;
|
||||
let goal_cpus = load_avg_1min / self.config.load_average_fraction_target;
|
||||
|
||||
// For Memory:
|
||||
//
|
||||
// Target point is where (Mem) * (MemoryUsageFractionTarget) == (Mem Usage)
|
||||
let used_memory = self.system.used_memory();
|
||||
let goal_memory_bytes: u64 =
|
||||
(self.system.used_memory() as f64 / self.config.memory_usage_fraction_target) as u64;
|
||||
|
||||
debug!("load avg: {load_avg_1min} used memory: {used_memory}");
|
||||
Resources {
|
||||
cpu: goal_cpus,
|
||||
mem: goal_memory_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
/// To avoid overly fine-grained requests to the agent, round up the request to a
|
||||
/// multiple of the CPU and memory size of one a Compute Unit.
|
||||
///
|
||||
/// We still track CPU and memory separately though. The autoscaler agent will combine
|
||||
/// the CPU and memory requests to a single "# of Compute Units" measure.
|
||||
fn quantize_resources(&self, res: Resources) -> Resources {
|
||||
Resources {
|
||||
cpu: (res.cpu / self.config.cpu_quantum).ceil() * self.config.cpu_quantum,
|
||||
mem: ((res.mem as f64 / self.config.mem_quantum as f64).ceil()
|
||||
* self.config.mem_quantum as f64) as u64,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: don't propagate errors, probably just warn!?
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn run(&mut self) -> anyhow::Result<()> {
|
||||
info!("starting dispatcher");
|
||||
|
||||
let mut ticker = tokio::time::interval(self.config.poll_interval);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
// ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
signal = self.kill.recv() => {
|
||||
@@ -355,39 +426,25 @@ impl Runner {
|
||||
}
|
||||
}
|
||||
|
||||
// Time to re-evaluate the scaling target
|
||||
_ = ticker.tick() => {
|
||||
let now = Instant::now();
|
||||
// New memory stats from the cgroup, *may* need to request upscaling, if we've
|
||||
// exceeded the threshold
|
||||
result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
|
||||
result.context("failed to receive from cgroup memory stats watcher")?;
|
||||
|
||||
// Calculate the desired resources based on current usage
|
||||
let target_now = self.calculate_raw_target();
|
||||
let cgroup = self.cgroup.as_ref().unwrap();
|
||||
|
||||
// Round it up to the nearest CU sizes, to avoid overly fine-grained
|
||||
// requests.
|
||||
let quantized_target_now = self.quantize_resources(target_now);
|
||||
let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
|
||||
|
||||
// Smoothen using sliding windows.
|
||||
self.cpu_window.add_sample(quantized_target_now.cpu, now);
|
||||
self.cpu_window.trim(now - self.config.sliding_window_length);
|
||||
self.mem_window.add_sample(quantized_target_now.mem, now);
|
||||
self.mem_window.trim(now - self.config.sliding_window_length);
|
||||
// If we haven't exceeded the threshold, then we're all ok
|
||||
if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
|
||||
continue;
|
||||
}
|
||||
|
||||
let sliding_target = Resources {
|
||||
cpu: *self.cpu_window.get_max(),
|
||||
mem: *self.mem_window.get_max(),
|
||||
};
|
||||
|
||||
// If no change, we're all ok.
|
||||
//
|
||||
// XXX: If the agent doesn't perform the scaling, should we retry after a while though?
|
||||
if let Some((last_request_target, last_request_at)) = self.last_scale_request {
|
||||
if last_request_target == sliding_target {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If it's been less than 1 second since the last time we requested
|
||||
// scaling, don't send a request to avoid spamming the agent.
|
||||
let elapsed = now.duration_since(last_request_at);
|
||||
// Otherwise, we generally want upscaling. But, if it's been less than 1 second
|
||||
// since the last time we requested upscaling, ignore the event, to avoid
|
||||
// spamming the agent.
|
||||
if let Some(t) = self.last_upscale_request_at {
|
||||
let elapsed = t.elapsed();
|
||||
if elapsed < Duration::from_secs(1) {
|
||||
// *Ideally* we'd like to log here that we're ignoring the fact the
|
||||
// memory stats are too high, but in practice this can result in
|
||||
@@ -396,28 +453,20 @@ impl Runner {
|
||||
// See https://github.com/neondatabase/neon/issues/5865 for more.
|
||||
continue;
|
||||
}
|
||||
|
||||
info!(
|
||||
old_target_cpu = last_request_target.cpu,
|
||||
old_target_mem = last_request_target.mem,
|
||||
target_cpu = sliding_target.cpu,
|
||||
target_mem = sliding_target.mem,
|
||||
"scaling target changed, requesting scaling",
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
target_cpu = sliding_target.cpu,
|
||||
target_mem = sliding_target.mem,
|
||||
"no previous scaling request, requesting initial scale",
|
||||
);
|
||||
}
|
||||
|
||||
self.last_scale_request = Some((sliding_target, now));
|
||||
self.last_upscale_request_at = Some(Instant::now());
|
||||
|
||||
info!(
|
||||
avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
|
||||
threshold = bytes_to_mebibytes(cgroup.threshold),
|
||||
"cgroup memory stats are high enough to upscale, requesting upscale",
|
||||
);
|
||||
|
||||
self.counter += 2; // Increment, preserving parity (i.e. keep the
|
||||
// counter odd). See the field comment for more.
|
||||
self.dispatcher
|
||||
.send(OutboundMsg::new(OutboundMsgKind::ScaleRequest {target: sliding_target}, self.counter))
|
||||
.send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
|
||||
.await
|
||||
.context("failed to send message")?;
|
||||
},
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
/// Maintain a sliding window for calculating Max over a period of time.
|
||||
///
|
||||
/// The window maintains a queue of samples. Each sample consists of a
|
||||
/// "value" and the timestamp that it was measured at.
|
||||
///
|
||||
/// The queue is ordered by time, newest samples are at the front.
|
||||
/// When a new sample is added, we delete any older samples in the
|
||||
/// queue with a lower value, because they cannot affect the max
|
||||
/// anymore. This means that the queue is always also ordered by value,
|
||||
/// with the greatest value at the back:
|
||||
///
|
||||
/// front back
|
||||
///
|
||||
/// #
|
||||
/// # #
|
||||
/// # # #
|
||||
/// # # #
|
||||
///
|
||||
///
|
||||
/// V: Value
|
||||
/// T: Time unit
|
||||
#[derive(Debug)]
|
||||
pub struct SlidingMax<V, T> {
|
||||
samples: VecDeque<(V, T)>,
|
||||
}
|
||||
|
||||
impl<V: std::cmp::PartialOrd, T: std::cmp::PartialOrd> SlidingMax<V, T> {
|
||||
pub fn new(initial_val: V, initial_time: T) -> SlidingMax<V, T> {
|
||||
SlidingMax {
|
||||
samples: VecDeque::from([(initial_val, initial_time)]),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a new sample to the window.
|
||||
///
|
||||
/// We assume that the time is >= the time of any existing sample
|
||||
/// in the queue, although we don't check it, and the code still
|
||||
/// works without e.g. panicking if you violate that. It just
|
||||
/// might not produce the correct result, until the disordered
|
||||
/// samples have fallen off the window.
|
||||
pub fn add_sample(&mut self, sample: V, time: T) {
|
||||
while let Some((v, _t)) = self.samples.front() {
|
||||
if sample < *v {
|
||||
break;
|
||||
} else {
|
||||
self.samples.pop_front();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
self.samples.push_front((sample, time))
|
||||
}
|
||||
|
||||
/// Remove samples older than 'threshold' from the window
|
||||
pub fn trim(&mut self, threshold: T) {
|
||||
while self.samples.len() >= 2 {
|
||||
let (_v, t) = self.samples.back().unwrap();
|
||||
|
||||
if *t < threshold {
|
||||
self.samples.pop_back();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current max over the window
|
||||
pub fn get_max(&self) -> &V {
|
||||
&self.samples.back().unwrap().0
|
||||
}
|
||||
}
|
||||
@@ -145,14 +145,6 @@ impl ReconstructTimeMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_materialized_cache_hits_direct_total",
|
||||
"Number of cache hits from materialized page cache without redo",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) struct ReconstructDataTimeMetrics {
|
||||
singular: Histogram,
|
||||
vectored: Histogram,
|
||||
@@ -182,14 +174,6 @@ pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> =
|
||||
}
|
||||
});
|
||||
|
||||
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_materialized_cache_hits_total",
|
||||
"Number of cache hits from materialized page cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) struct GetVectoredLatency {
|
||||
map: EnumMap<TaskKind, Option<Histogram>>,
|
||||
}
|
||||
@@ -298,12 +282,8 @@ pub(crate) static SCAN_LATENCY: Lazy<ScanLatency> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
pub(crate) struct PageCacheMetricsForTaskKind {
|
||||
pub read_accesses_materialized_page: IntCounter,
|
||||
pub read_accesses_immutable: IntCounter,
|
||||
|
||||
pub read_hits_immutable: IntCounter,
|
||||
pub read_hits_materialized_page_exact: IntCounter,
|
||||
pub read_hits_materialized_page_older_lsn: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) struct PageCacheMetrics {
|
||||
@@ -336,16 +316,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
|
||||
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
|
||||
let content_kind: &'static str = content_kind.into();
|
||||
PageCacheMetricsForTaskKind {
|
||||
read_accesses_materialized_page: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_accesses_immutable: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
|
||||
@@ -357,28 +327,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
|
||||
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_materialized_page_exact: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
"exact",
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_materialized_page_older_lsn: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
"older_lsn",
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
}
|
||||
}))
|
||||
})),
|
||||
@@ -394,7 +342,6 @@ pub(crate) struct PageCacheSizeMetrics {
|
||||
pub max_bytes: UIntGauge,
|
||||
|
||||
pub current_bytes_immutable: UIntGauge,
|
||||
pub current_bytes_materialized_page: UIntGauge,
|
||||
}
|
||||
|
||||
static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
@@ -420,11 +367,6 @@ pub(crate) static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> =
|
||||
.get_metric_with_label_values(&["immutable"])
|
||||
.unwrap()
|
||||
},
|
||||
current_bytes_materialized_page: {
|
||||
PAGE_CACHE_SIZE_CURRENT_BYTES
|
||||
.get_metric_with_label_values(&["materialized_page"])
|
||||
.unwrap()
|
||||
},
|
||||
});
|
||||
|
||||
pub(crate) mod page_cache_eviction_metrics {
|
||||
@@ -2918,13 +2860,11 @@ pub fn preinitialize_metrics() {
|
||||
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
|
||||
// order:
|
||||
// - global metrics reside in a Lazy<PageserverMetrics>
|
||||
// - access via crate::metrics::PS_METRICS.materialized_page_cache_hit.inc()
|
||||
// - access via crate::metrics::PS_METRICS.some_metric.inc()
|
||||
// - could move the statics into TimelineMetrics::new()?
|
||||
|
||||
// counters
|
||||
[
|
||||
&MATERIALIZED_PAGE_CACHE_HIT,
|
||||
&MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
||||
&UNEXPECTED_ONDEMAND_DOWNLOADS,
|
||||
&WALRECEIVER_STARTED_CONNECTIONS,
|
||||
&WALRECEIVER_BROKER_UPDATES,
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
//!
|
||||
//! Two types of pages are supported:
|
||||
//!
|
||||
//! * **Materialized pages**, filled & used by page reconstruction
|
||||
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
|
||||
//!
|
||||
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
|
||||
@@ -28,9 +27,6 @@
|
||||
//! Page cache maps from a cache key to a buffer slot.
|
||||
//! The cache key uniquely identifies the piece of data that is being cached.
|
||||
//!
|
||||
//! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
|
||||
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
|
||||
//!
|
||||
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
|
||||
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
|
||||
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
|
||||
@@ -82,13 +78,10 @@ use std::{
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
|
||||
repository::Key,
|
||||
};
|
||||
|
||||
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
||||
@@ -139,33 +132,7 @@ pub fn next_file_id() -> FileId {
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum CacheKey {
|
||||
MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey,
|
||||
lsn: Lsn,
|
||||
},
|
||||
ImmutableFilePage {
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
struct MaterializedPageHashKey {
|
||||
/// Why is this TenantShardId rather than TenantId?
|
||||
///
|
||||
/// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this
|
||||
/// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this
|
||||
/// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
|
||||
/// special-cased in some other way.
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Version {
|
||||
lsn: Lsn,
|
||||
slot_idx: usize,
|
||||
ImmutableFilePage { file_id: FileId, blkno: u32 },
|
||||
}
|
||||
|
||||
struct Slot {
|
||||
@@ -236,17 +203,6 @@ impl SlotInner {
|
||||
}
|
||||
|
||||
pub struct PageCache {
|
||||
/// This contains the mapping from the cache key to buffer slot that currently
|
||||
/// contains the page, if any.
|
||||
///
|
||||
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
|
||||
/// this HashMap can be replaced with a more concurrent version, there are
|
||||
/// plenty of such crates around.
|
||||
///
|
||||
/// If you add support for caching different kinds of objects, each object kind
|
||||
/// can have a separate mapping map, next to this field.
|
||||
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
||||
|
||||
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
|
||||
|
||||
/// The actual buffers with their metadata.
|
||||
@@ -371,175 +327,14 @@ pub enum ReadBufResult<'a> {
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
//
|
||||
// Section 1.1: Public interface functions for looking up and memorizing materialized page
|
||||
// versions in the page cache
|
||||
//
|
||||
|
||||
/// Look up a materialized page version.
|
||||
///
|
||||
/// The 'lsn' is an upper bound, this will return the latest version of
|
||||
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
|
||||
/// returned page.
|
||||
pub async fn lookup_materialized_page(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
|
||||
return None;
|
||||
};
|
||||
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_accesses_materialized_page
|
||||
.inc();
|
||||
|
||||
let mut cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key: *key,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
|
||||
if let Some(guard) = self
|
||||
.try_lock_for_read(&mut cache_key, &mut Some(permit))
|
||||
.await
|
||||
{
|
||||
if let CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: available_lsn,
|
||||
} = cache_key
|
||||
{
|
||||
if available_lsn == lsn {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_hits_materialized_page_exact
|
||||
.inc();
|
||||
} else {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_hits_materialized_page_older_lsn
|
||||
.inc();
|
||||
}
|
||||
Some((available_lsn, guard))
|
||||
} else {
|
||||
panic!("unexpected key type in slot");
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
img: &[u8],
|
||||
) -> anyhow::Result<()> {
|
||||
let cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(&cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
debug_assert_eq!(inner.buf.len(), img.len());
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(inner.buf == img);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
// Create a write guard for the slot so we go through the expected motions.
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
let mut write_guard = PageWriteGuard {
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
},
|
||||
};
|
||||
write_guard.copy_from_slice(img);
|
||||
let _ = write_guard.mark_valid();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key, ctx).await
|
||||
self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
//
|
||||
@@ -573,19 +368,11 @@ impl PageCache {
|
||||
|
||||
/// Look up a page in the cache.
|
||||
///
|
||||
/// If the search criteria is not exact, *cache_key is updated with the key
|
||||
/// for exact key of the returned page. (For materialized pages, that means
|
||||
/// that the LSN in 'cache_key' is updated with the LSN of the returned page
|
||||
/// version.)
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
async fn try_lock_for_read(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
cache_key: &CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we released the mapping
|
||||
@@ -598,9 +385,6 @@ impl PageCache {
|
||||
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
|
||||
slot_guard: inner,
|
||||
});
|
||||
} else {
|
||||
// search_mapping might have modified the search key; restore it.
|
||||
*cache_key = cache_key_orig;
|
||||
}
|
||||
}
|
||||
None
|
||||
@@ -637,15 +421,12 @@ impl PageCache {
|
||||
///
|
||||
async fn lock_for_read(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
cache_key: &CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
}
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
@@ -717,52 +498,15 @@ impl PageCache {
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Returns the slot index, if any. If the search criteria is not exact,
|
||||
/// *cache_key is updated with the actual key of the found page.
|
||||
/// Returns the slot index, if any.
|
||||
///
|
||||
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
|
||||
/// get recycled for an unrelated page immediately after this function
|
||||
/// returns. The caller is responsible for re-checking that the slot still
|
||||
/// contains the page with the same key before using it.
|
||||
///
|
||||
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
||||
fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => version_idx,
|
||||
Err(0) => return None,
|
||||
Err(version_idx) => version_idx - 1,
|
||||
};
|
||||
let version = &versions[version_idx];
|
||||
*lsn = version.lsn;
|
||||
Some(version.slot_idx)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
||||
/// allocating a new buffer.
|
||||
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
Some(versions[version_idx].slot_idx)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
@@ -775,27 +519,6 @@ impl PageCache {
|
||||
///
|
||||
fn remove_mapping(&self, old_key: &CacheKey) {
|
||||
match old_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: old_hash_key,
|
||||
lsn: old_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
||||
let versions = old_entry.get_mut();
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
|
||||
versions.remove(version_idx);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.sub_page_sz(1);
|
||||
if versions.is_empty() {
|
||||
old_entry.remove_entry();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
panic!("could not find old key in mapping")
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
@@ -812,30 +535,6 @@ impl PageCache {
|
||||
/// of the existing mapping and leaves it untouched.
|
||||
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
||||
match new_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: new_key,
|
||||
lsn: new_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let versions = map.entry(new_key.clone()).or_default();
|
||||
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
||||
Err(version_idx) => {
|
||||
versions.insert(
|
||||
version_idx,
|
||||
Version {
|
||||
lsn: *new_lsn,
|
||||
slot_idx,
|
||||
},
|
||||
);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
@@ -949,7 +648,6 @@ impl PageCache {
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
||||
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
@@ -968,7 +666,6 @@ impl PageCache {
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
|
||||
@@ -945,9 +945,8 @@ impl PageServerHandler {
|
||||
b"valid_until",
|
||||
)]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[Some(
|
||||
&valid_until.as_millis().to_be_bytes(),
|
||||
)]))?
|
||||
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
valid_until.as_millis().to_string().as_bytes(),
|
||||
)]))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::collections::{BTreeMap, HashMap};
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use sysinfo::SystemExt;
|
||||
use tokio::fs;
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -483,10 +484,9 @@ pub async fn init_tenant_mgr(
|
||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
||||
|
||||
// Initialize dynamic limits that depend on system resources
|
||||
let system_memory = sysinfo::System::new_with_specifics(
|
||||
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
|
||||
)
|
||||
.total_memory();
|
||||
let system_memory =
|
||||
sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
|
||||
.total_memory();
|
||||
let max_ephemeral_layer_bytes =
|
||||
conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
|
||||
tracing::info!("Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory");
|
||||
|
||||
@@ -101,9 +101,7 @@ use crate::{
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::metrics::{
|
||||
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
||||
};
|
||||
use crate::metrics::TimelineMetrics;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use pageserver_api::reltag::RelTag;
|
||||
@@ -120,7 +118,6 @@ use utils::{
|
||||
simple_rcu::{Rcu, RcuReadGuard},
|
||||
};
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::repository::GcResult;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr;
|
||||
@@ -134,7 +131,7 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::{config::TenantConf, storage_layer::VectoredValueReconstructState};
|
||||
use super::config::TenantConf;
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
|
||||
@@ -887,32 +884,11 @@ impl Timeline {
|
||||
|
||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||
|
||||
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
|
||||
// The cached image can be returned directly if there is no WAL between the cached image
|
||||
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
|
||||
// for redo.
|
||||
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
|
||||
Some((cached_lsn, cached_img)) => {
|
||||
match cached_lsn.cmp(&lsn) {
|
||||
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
|
||||
Ordering::Equal => {
|
||||
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
|
||||
return Ok(cached_img); // exact LSN match, return the image
|
||||
}
|
||||
Ordering::Greater => {
|
||||
unreachable!("the returned lsn should never be after the requested lsn")
|
||||
}
|
||||
}
|
||||
Some((cached_lsn, cached_img))
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
match self.conf.get_impl {
|
||||
GetImpl::Legacy => {
|
||||
let reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: cached_page_img,
|
||||
img: None,
|
||||
};
|
||||
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
||||
@@ -926,13 +902,6 @@ impl Timeline {
|
||||
// entry returned above.
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
|
||||
// Only add the cached image to the reconstruct state when it exists.
|
||||
if cached_page_img.is_some() {
|
||||
let mut key_state = VectoredValueReconstructState::default();
|
||||
key_state.img = cached_page_img;
|
||||
reconstruct_state.keys.insert(key, Ok(key_state));
|
||||
}
|
||||
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await;
|
||||
@@ -3240,7 +3209,6 @@ impl Timeline {
|
||||
ValueReconstructResult::Continue => {
|
||||
// If we reached an earlier cached page image, we're done.
|
||||
if cont_lsn == cached_lsn + 1 {
|
||||
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
||||
return Ok(traversal_path);
|
||||
}
|
||||
if let Some(prev) = prev_lsn {
|
||||
@@ -3614,26 +3582,6 @@ impl Timeline {
|
||||
})
|
||||
}
|
||||
|
||||
/// # Cancel-safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
async fn lookup_cached_page(
|
||||
&self,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, Bytes)> {
|
||||
let cache = page_cache::get();
|
||||
|
||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
||||
// We should look at the key to determine if it's a cacheable object
|
||||
let (lsn, read_guard) = cache
|
||||
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
|
||||
.await?;
|
||||
let img = Bytes::from(read_guard.to_vec());
|
||||
Some((lsn, img))
|
||||
}
|
||||
|
||||
async fn get_ready_ancestor_timeline(
|
||||
&self,
|
||||
ancestor: &Arc<Timeline>,
|
||||
@@ -5280,8 +5228,6 @@ impl Timeline {
|
||||
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
|
||||
};
|
||||
|
||||
let last_rec_lsn = data.records.last().unwrap().0;
|
||||
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
@@ -5295,23 +5241,6 @@ impl Timeline {
|
||||
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
||||
};
|
||||
|
||||
if img.len() == page_cache::PAGE_SZ {
|
||||
let cache = page_cache::get();
|
||||
if let Err(e) = cache
|
||||
.memorize_materialized_page(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
)
|
||||
.await
|
||||
.context("Materialized page memoization failed")
|
||||
{
|
||||
return Err(PageReconstructError::from(e));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(img)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,8 +118,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"libmetrics_launch_timestamp",
|
||||
"libmetrics_build_info",
|
||||
"libmetrics_tracing_event_count_total",
|
||||
"pageserver_materialized_cache_hits_total",
|
||||
"pageserver_materialized_cache_hits_direct_total",
|
||||
"pageserver_page_cache_read_hits_total",
|
||||
"pageserver_page_cache_read_accesses_total",
|
||||
"pageserver_page_cache_size_current_bytes",
|
||||
|
||||
@@ -3446,11 +3446,12 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
|
||||
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
|
||||
|
||||
# This lock prevents concurrent start & stop operations, keeping `self.running` consistent
|
||||
# with whether we're really running. Tests generally wouldn't try and do these concurrently,
|
||||
# but endpoints are also stopped during test teardown, which might happen concurrently with
|
||||
# destruction of objects in tests.
|
||||
self.lock = threading.Lock()
|
||||
# Semaphore is set to 1 when we start, and acquire'd back to zero when we stop
|
||||
#
|
||||
# We use a semaphore rather than a bool so that racing calls to stop() don't
|
||||
# try and stop the same process twice, as stop() is called by test teardown and
|
||||
# potentially by some __del__ chains in other threads.
|
||||
self._running = threading.Semaphore(0)
|
||||
|
||||
def http_client(
|
||||
self, auth_token: Optional[str] = None, retries: Optional[Retry] = None
|
||||
@@ -3522,15 +3523,14 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
log.info(f"Starting postgres endpoint {self.endpoint_id}")
|
||||
|
||||
with self.lock:
|
||||
self.env.neon_cli.endpoint_start(
|
||||
self.endpoint_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
self.running = True
|
||||
self.env.neon_cli.endpoint_start(
|
||||
self.endpoint_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
self._running.release(1)
|
||||
|
||||
return self
|
||||
|
||||
@@ -3578,9 +3578,12 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
conf_file.write("\n".join(hba) + "\n")
|
||||
conf_file.write(data)
|
||||
|
||||
if self.running:
|
||||
if self.is_running():
|
||||
self.safe_psql("SELECT pg_reload_conf()")
|
||||
|
||||
def is_running(self):
|
||||
return self._running._value > 0
|
||||
|
||||
def reconfigure(self, pageserver_id: Optional[int] = None):
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
|
||||
@@ -3629,13 +3632,12 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
with self.lock:
|
||||
if self.running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.running = False
|
||||
running = self._running.acquire(blocking=False)
|
||||
if running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
@@ -3645,13 +3647,13 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
with self.lock:
|
||||
running = self._running.acquire(blocking=False)
|
||||
if running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.endpoint_id = None
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import queue
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
@@ -8,11 +9,7 @@ from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: str):
|
||||
if build_type == "debug":
|
||||
# Disable vectored read path cross validation since it makes the test time out.
|
||||
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
|
||||
|
||||
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||
@@ -33,11 +30,10 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
|
||||
stop = threading.Event()
|
||||
n_rows = 100000
|
||||
n_threads = 20
|
||||
n_updates_per_thread = 10000
|
||||
n_updates_per_connection = 1000
|
||||
n_total_updates = n_threads * n_updates_per_thread
|
||||
|
||||
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
||||
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
||||
@@ -48,11 +44,11 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
|
||||
# performed (plus the initial 1 on each row).
|
||||
#
|
||||
# Furthermore, each thread will reconnect between every 1000 updates.
|
||||
def run_updates():
|
||||
def run_updates(n_updates_performed_q: queue.Queue[int]):
|
||||
n_updates_performed = 0
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
for _ in range(n_updates_per_thread):
|
||||
while not stop.is_set():
|
||||
id = random.randint(1, n_rows)
|
||||
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
||||
n_updates_performed += 1
|
||||
@@ -61,19 +57,28 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
|
||||
conn.close()
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
n_updates_performed_q.put(n_updates_performed)
|
||||
|
||||
n_updates_performed_q: queue.Queue[int] = queue.Queue()
|
||||
threads: List[threading.Thread] = []
|
||||
for _i in range(n_threads):
|
||||
thread = threading.Thread(target=run_updates, args=(), daemon=True)
|
||||
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# unlink, this is what we're actually testing
|
||||
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
||||
os.rename(cache_dir, new_cache_dir)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
stop.set()
|
||||
|
||||
n_updates_performed = 0
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
n_updates_performed += n_updates_performed_q.get()
|
||||
|
||||
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
|
||||
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed
|
||||
|
||||
@@ -88,6 +88,9 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
|
||||
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
|
||||
)
|
||||
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.append(".*page_service.*error obtaining lsn lease.*Tenant .* not found")
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
|
||||
@@ -23,11 +23,11 @@ if TYPE_CHECKING:
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
#
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
def test_pg_regress(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
build_type: str,
|
||||
pg_bin: PgBin,
|
||||
capsys: CaptureFixture[str],
|
||||
base_dir: Path,
|
||||
@@ -43,10 +43,6 @@ def test_pg_regress(
|
||||
if shard_count is not None:
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
|
||||
if build_type == "debug":
|
||||
# Disable vectored read path cross validation since it makes the test time out.
|
||||
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.enable_scrub_on_exit()
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||
|
||||
@@ -6,7 +6,6 @@ from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
def test_physical_replication(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
n_records = 100000
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
@@ -22,8 +21,20 @@ def test_physical_replication(neon_simple_env: NeonEnv):
|
||||
with p_con.cursor() as p_cur:
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
for pk in range(n_records):
|
||||
runtime_secs = 30
|
||||
started_at = time.time()
|
||||
pk = 0
|
||||
while True:
|
||||
pk += 1
|
||||
now = time.time()
|
||||
if now - started_at > runtime_secs:
|
||||
break
|
||||
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
||||
# an earlier version of this test was based on a fixed number of loop iterations
|
||||
# and selected for pk=(random.randrange(1, fixed number of loop iterations)).
|
||||
# => the probability of selection for a value that was never inserted changed from 99.9999% to 0% over the course of the test.
|
||||
#
|
||||
# We changed the test to where=(random.randrange(1, 2*pk)), which means the probability is now fixed to 50%.
|
||||
s_cur.execute(
|
||||
"select * from t where pk=%s", (random.randrange(1, n_records),)
|
||||
"select * from t where pk=%s", (random.randrange(1, 2 * pk),)
|
||||
)
|
||||
|
||||
@@ -6,18 +6,24 @@ from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
# Create read-only compute nodes, anchored at historical points in time.
|
||||
#
|
||||
# This is very similar to the 'test_branch_behind' test, but instead of
|
||||
# creating branches, creates read-only nodes.
|
||||
#
|
||||
def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Create read-only compute nodes, anchored at historical points in time.
|
||||
|
||||
This is very similar to the 'test_branch_behind' test, but instead of
|
||||
creating branches, creates read-only nodes.
|
||||
"""
|
||||
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_readonly_node", "empty")
|
||||
endpoint_main = env.endpoints.create_start("test_readonly_node")
|
||||
|
||||
env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*")
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*basebackup .* failed: invalid basebackup lsn.*",
|
||||
".*page_service.*error obtaining lsn lease.*.*tried to request a page version that was garbage collected",
|
||||
]
|
||||
)
|
||||
|
||||
main_pg_conn = endpoint_main.connect()
|
||||
main_cur = main_pg_conn.cursor()
|
||||
|
||||
Reference in New Issue
Block a user