refactor: replace LD_PRELOAD with a baked-in solution

This commit is contained in:
Christian Schwarz
2023-03-30 15:28:58 +02:00
parent d5b8f123ec
commit 140ef67dd8
12 changed files with 267 additions and 284 deletions

View File

@@ -240,7 +240,7 @@ jobs:
- name: Run cargo build
run: |
${cov_prefix} mold -run cargo build $CARGO_FLAGS
${cov_prefix} mold -run cargo build $CARGO_FLAGS --bins --tests
- name: Run cargo test
run: |
@@ -260,16 +260,6 @@ jobs:
cp "$SRC" "$DST"
done
cdylibs=$(
${cov_prefix} cargo metadata $CARGO_FEATURES --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("cdylib")) | .name'
)
for lib in $cdylibs; do
SRC=target/$BUILD_TYPE/lib$lib.so
DST=/tmp/neon/bin/lib$lib.so
cp "$SRC" "$DST"
done
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
# Keep bloated coverage data files away from the rest of the artifact

14
Cargo.lock generated
View File

@@ -3733,19 +3733,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "statvfs_ldpreload"
version = "0.1.0"
dependencies = [
"anyhow",
"libc",
"regex",
"serde",
"serde_json",
"walkdir",
"workspace_hack",
]
[[package]]
name = "storage_broker"
version = "0.1.0"
@@ -4548,6 +4535,7 @@ dependencies = [
"once_cell",
"pin-project-lite",
"rand",
"regex",
"routerify",
"sentry",
"serde",

View File

@@ -219,13 +219,7 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
// Pass through these environment variables to the command
for var in [
"LLVM_PROFILE_FILE",
"FAILPOINTS",
"RUST_LOG",
"LD_PRELOAD",
"NEON_STATVFS_LDPRELOAD_CONFIG",
] {
for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
if let Some(val) = std::env::var_os(var) {
filled_cmd = filled_cmd.env(var, val);
}

View File

@@ -1,17 +0,0 @@
[package]
name = "statvfs_ldpreload"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[lib]
crate-type = ["cdylib"]
[dependencies]
anyhow.workspace = true
libc.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
walkdir.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,155 +0,0 @@
use std::path::PathBuf;
use anyhow::Context;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
enum UsedBytesSource {
Fixed {
value: u64,
},
WalkDir {
path: PathBuf,
// only count files whose names match this regex
name_filter: Option<String>,
},
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[allow(clippy::upper_case_acronyms)]
enum MockedError {
EIO,
}
impl From<MockedError> for libc::c_int {
fn from(e: MockedError) -> Self {
match e {
MockedError::EIO => libc::EIO,
}
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
enum Mock {
Success {
blocksize: u64,
total_blocks: u64,
used: UsedBytesSource,
},
Failure {
mocked_error: MockedError,
},
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Config {
magic: String,
mock: Mock,
}
static INVOCATION_NUMBER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
#[derive(serde::Serialize)]
struct Status<'a> {
config: &'a Config,
invocation_number: usize,
}
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
#[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn fstatvfs(_fd: libc::c_int, buf: *mut libc::statvfs64) -> libc::c_int {
use std::mem::MaybeUninit;
// the intended behavior for this mock is provided in an environment variable
let config = std::env::var("NEON_STATVFS_LDPRELOAD_CONFIG").unwrap_or_else(|_| {
panic!("NEON_STATVFS_LDPRELOAD_CONFIG not set");
});
let config: Config = serde_json::from_str(&config).unwrap();
// print a message to stderr, so that the test can ensure LD_PRELOAD is working
let invocation_number = INVOCATION_NUMBER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let status = Status {
config: &config,
invocation_number,
};
let status_line = format!(
"statvfs_ldpreload status: {}",
serde_json::to_string(&status).unwrap()
);
// Do a single write() system call to the stderr file descriptor.
// The kernel should serialize it with other writes to stderr,
// as long as the write doesn't cross page boundary.
eprintln!("{}", status_line);
// mock the statvfs call
match config.mock {
Mock::Success {
blocksize,
total_blocks,
used,
} => {
let used_bytes = used.get().unwrap();
// round it up to the nearest block multiple
let used_blocks = (used_bytes + (blocksize - 1)) / blocksize;
if used_blocks > total_blocks {
panic!("mocking error: used_blocks > total_blocks: {used_blocks} > {total_blocks}");
}
let avail_blocks = total_blocks - used_blocks;
// SAFETY: for the purposes of mocking, zeroed values for the fields which we
// don't set below are fine.
let mut ret = unsafe { MaybeUninit::<libc::statvfs64>::zeroed().assume_init() };
ret.f_bsize = blocksize;
ret.f_frsize = blocksize;
ret.f_blocks = total_blocks;
ret.f_bfree = avail_blocks;
ret.f_bavail = avail_blocks;
// SAFETY: the cfg! for this function ensures that the buffer has size of libc::statvfs64
unsafe {
buf.write(ret);
}
0
}
Mock::Failure { mocked_error } => {
// SAFETY: we mock the libc, we're allowed to set errno
unsafe { libc::__errno_location().write(mocked_error.into()) };
-1
}
}
}
impl UsedBytesSource {
fn get(&self) -> anyhow::Result<u64> {
match self {
UsedBytesSource::Fixed { value } => Ok(*value),
UsedBytesSource::WalkDir { path, name_filter } => {
let mut total = 0;
let filter_compiled = name_filter.as_ref().map(|n| regex::Regex::new(n).unwrap());
for entry in walkdir::WalkDir::new(path) {
let entry = entry?;
if !entry.file_type().is_file() {
continue;
}
if !filter_compiled
.as_ref()
.map(|filter| filter.is_match(entry.file_name().to_str().unwrap()))
.unwrap_or(true)
{
continue;
}
total += entry
.metadata()
.with_context(|| format!("get metadata of {:?}", entry.path()))?
.len();
}
Ok(total)
}
}
}
}

View File

@@ -19,6 +19,7 @@ jsonwebtoken.workspace = true
nix.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
regex.workspace = true
routerify.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -52,6 +52,7 @@ pub mod history_buffer;
pub mod measured_stream;
pub mod serde_percent;
pub mod serde_regex;
/// use with fail::cfg("$name", "return(2000)")
#[macro_export]

View File

@@ -0,0 +1,60 @@
//! A `serde::Deserialize` type for regexes.
use std::ops::Deref;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct Regex(
#[serde(
deserialize_with = "deserialize_regex",
serialize_with = "serialize_regex"
)]
regex::Regex,
);
fn deserialize_regex<'de, D>(deserializer: D) -> Result<regex::Regex, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let s: String = serde::de::Deserialize::deserialize(deserializer)?;
let re = regex::Regex::new(&s).map_err(serde::de::Error::custom)?;
Ok(re)
}
fn serialize_regex<S>(re: &regex::Regex, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
serializer.collect_str(re.as_str())
}
impl Deref for Regex {
type Target = regex::Regex;
fn deref(&self) -> &regex::Regex {
&self.0
}
}
impl PartialEq for Regex {
fn eq(&self, other: &Regex) -> bool {
// comparing the automatons would be quite complicated
self.as_str() == other.as_str()
}
}
impl Eq for Regex {}
#[cfg(test)]
mod tests {
#[test]
fn roundtrip() {
let input = r#""foo.*bar""#;
let re: super::Regex = serde_json::from_str(input).unwrap();
assert!(re.is_match("foo123bar"));
assert!(!re.is_match("foo"));
let output = serde_json::to_string(&re).unwrap();
assert_eq!(output, input);
}
}

View File

@@ -42,15 +42,14 @@
use std::{
collections::HashMap,
path::Path,
sync::Arc,
time::{Duration, SystemTime},
};
use anyhow::Context;
use nix::dir::Dir;
use remote_storage::GenericRemoteStorage;
use serde::{Deserialize, Serialize};
use sync_wrapper::SyncWrapper;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
@@ -68,6 +67,8 @@ pub struct DiskUsageEvictionTaskConfig {
pub min_avail_bytes: u64,
#[serde(with = "humantime_serde")]
pub period: Duration,
#[cfg(feature = "testing")]
pub mock_statvfs: Option<crate::statvfs::mock::Config>,
}
#[derive(Default)]
@@ -86,16 +87,6 @@ pub fn launch_disk_usage_global_eviction_task(
return Ok(());
};
let tenants_dir_fd = {
let tenants_path = conf.tenants_path();
nix::dir::Dir::open(
&tenants_path,
nix::fcntl::OFlag::O_DIRECTORY,
nix::sys::stat::Mode::empty(),
)
.with_context(|| format!("open tenants_path {tenants_path:?}"))?
};
info!("launching disk usage based eviction task");
task_mgr::spawn(
@@ -110,7 +101,7 @@ pub fn launch_disk_usage_global_eviction_task(
&state,
task_config,
storage,
tenants_dir_fd,
&conf.tenants_path(),
task_mgr::shutdown_token(),
)
.await;
@@ -127,17 +118,9 @@ async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: GenericRemoteStorage,
tenants_dir_fd: Dir,
tenants_dir: &Path,
cancel: CancellationToken,
) {
// nix::dir::Dir is Send but not Sync.
// One would think that that is sufficient, but rustc complains that the &tenants_dir_fd
// that we pass to disk_usage_eviction_iteration below will outlive the .await;
// The reason is that the &tenants_dir_fd is not sync because of stdlib-enforced axiom
// T: Sync <=> &T: Send
// The solution is to use SyncWrapper, which, by owning the tenants_dir_fd, can impl Sync.
let mut tenants_dir_fd = SyncWrapper::new(tenants_dir_fd);
use crate::tenant::tasks::random_init_delay;
{
if random_init_delay(task_config.period, &cancel)
@@ -159,7 +142,7 @@ async fn disk_usage_eviction_task(
state,
task_config,
&storage,
&mut tenants_dir_fd,
tenants_dir,
&cancel,
)
.await;
@@ -195,10 +178,10 @@ async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir_fd: &mut SyncWrapper<Dir>,
tenants_dir: &Path,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir_fd, task_config)
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
match res {
@@ -210,7 +193,7 @@ async fn disk_usage_eviction_task_iteration(
}
IterationOutcome::Finished(outcome) => {
// Verify with statvfs whether we made any real progress
let after = filesystem_level_usage::get(tenants_dir_fd, task_config)
let after = filesystem_level_usage::get(tenants_dir, task_config)
// It's quite unlikely to hit the error here. Keep the code simple and bail out.
.context("get filesystem-level disk usage after evictions")?;
@@ -624,12 +607,11 @@ impl std::ops::Deref for TimelineKey {
}
mod filesystem_level_usage {
use std::path::Path;
use anyhow::Context;
use nix::{
dir::Dir,
sys::statvfs::{self, Statvfs},
};
use sync_wrapper::SyncWrapper;
use crate::statvfs::Statvfs;
use super::DiskUsageEvictionTaskConfig;
@@ -669,10 +651,10 @@ mod filesystem_level_usage {
}
pub fn get<'a>(
tenants_dir_fd: &mut SyncWrapper<Dir>,
tenants_dir: &Path,
config: &'a DiskUsageEvictionTaskConfig,
) -> anyhow::Result<Usage<'a>> {
let stat: Statvfs = statvfs::fstatvfs(tenants_dir_fd.get_mut())
let stat = Statvfs::get(tenants_dir, config.mock_statvfs.as_ref())
.context("statvfs failed, presumably directory got unlinked")?;
// https://unix.stackexchange.com/a/703650

View File

@@ -13,6 +13,7 @@ pub mod page_cache;
pub mod page_service;
pub mod pgdatadir_mapping;
pub mod repository;
pub(crate) mod statvfs;
pub mod task_mgr;
pub mod tenant;
pub mod trace;

148
pageserver/src/statvfs.rs Normal file
View File

@@ -0,0 +1,148 @@
//! Wrapper around nix::sys::statvfs::Statvfs that allows for mocking.
use std::path::Path;
pub enum Statvfs {
Real(nix::sys::statvfs::Statvfs),
Mock(mock::Statvfs),
}
impl Statvfs {
pub fn get(tenants_dir: &Path, mocked: Option<&mock::Config>) -> nix::Result<Self> {
if let Some(mocked) = mocked {
Ok(Statvfs::Mock(mock::get(tenants_dir, mocked)?))
} else {
Ok(Statvfs::Real(nix::sys::statvfs::statvfs(tenants_dir)?))
}
}
pub fn blocks(&self) -> u64 {
match self {
Statvfs::Real(stat) => stat.blocks(),
Statvfs::Mock(stat) => stat.blocks,
}
}
pub fn blocks_available(&self) -> u64 {
match self {
Statvfs::Real(stat) => stat.blocks_available(),
Statvfs::Mock(stat) => stat.blocks_available,
}
}
pub fn fragment_size(&self) -> u64 {
match self {
Statvfs::Real(stat) => stat.fragment_size(),
Statvfs::Mock(stat) => stat.fragment_size,
}
}
pub fn block_size(&self) -> u64 {
match self {
Statvfs::Real(stat) => stat.block_size(),
Statvfs::Mock(stat) => stat.block_size,
}
}
}
pub mod mock {
use anyhow::Context;
use regex::Regex;
use std::path::Path;
use tracing::log::info;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Config {
magic: String,
behavior: Behavior,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
pub enum Behavior {
Success {
blocksize: u64,
total_blocks: u64,
name_filter: Option<utils::serde_regex::Regex>,
},
Failure {
mocked_error: MockedError,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[allow(clippy::upper_case_acronyms)]
pub enum MockedError {
EIO,
}
impl From<MockedError> for nix::Error {
fn from(e: MockedError) -> Self {
match e {
MockedError::EIO => nix::Error::EIO,
}
}
}
pub fn get(tenants_dir: &Path, config: &Config) -> nix::Result<Statvfs> {
info!("running mocked statvfs, magic: {}", config.magic);
match config.behavior {
Behavior::Success {
blocksize,
total_blocks,
ref name_filter,
} => {
let used_bytes = walk_dir_disk_usage(tenants_dir, name_filter.as_deref()).unwrap();
// round it up to the nearest block multiple
let used_blocks = (used_bytes + (blocksize - 1)) / blocksize;
if used_blocks > total_blocks {
panic!(
"mocking error: used_blocks > total_blocks: {used_blocks} > {total_blocks}"
);
}
let avail_blocks = total_blocks - used_blocks;
Ok(Statvfs {
blocks: total_blocks,
blocks_available: avail_blocks,
fragment_size: blocksize,
block_size: blocksize,
})
}
Behavior::Failure { mocked_error } => Err(mocked_error.into()),
}
}
fn walk_dir_disk_usage(path: &Path, name_filter: Option<&Regex>) -> anyhow::Result<u64> {
let mut total = 0;
for entry in walkdir::WalkDir::new(path) {
let entry = entry?;
if !entry.file_type().is_file() {
continue;
}
if !name_filter
.as_ref()
.map(|filter| filter.is_match(entry.file_name().to_str().unwrap()))
.unwrap_or(true)
{
continue;
}
total += entry
.metadata()
.with_context(|| format!("get metadata of {:?}", entry.path()))?
.len();
}
Ok(total)
}
pub struct Statvfs {
pub blocks: u64,
pub blocks_available: u64,
pub fragment_size: u64,
pub block_size: u64,
}
}

View File

@@ -1,13 +1,12 @@
import json
import shutil
import sys
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Tuple
from typing import Dict, Tuple
import pytest
import toml
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
LocalFsStorage,
@@ -96,27 +95,32 @@ class EvictionEnv:
with self.neon_env.postgres.create_start("main", tenant_id=tenant_id, lsn=lsn) as pg:
self.pg_bin.run(["pgbench", "-S", pg.connstr()])
def pageserver_start_with_mocked_statvfs(self, config_override: str, mock: Dict[str, Any]):
def pageserver_start_with_disk_usage_eviction(
self, period, max_usage_pct, min_avail_bytes, mock_behavior
):
magic = str(uuid.uuid4())
preload_lib_path: Path = self.neon_env.neon_binpath / "libstatvfs_ldpreload.so"
assert preload_lib_path.is_file(), "libstatvfs_ldpreload.so must be built"
disk_usage_config = {
"period": period,
"max_usage_pct": max_usage_pct,
"min_avail_bytes": min_avail_bytes,
"mock_statvfs": {
"behavior": mock_behavior,
"magic": magic,
},
}
enc = toml.TomlEncoder()
self.neon_env.pageserver.start(
overrides=("--pageserver-config-override=" + config_override,),
extra_env_vars={
"LD_PRELOAD": str(preload_lib_path.absolute()),
"NEON_STATVFS_LDPRELOAD_CONFIG": json.dumps(
{
"magic": magic,
"mock": mock,
}
),
},
overrides=(
"--pageserver-config-override=disk_usage_based_eviction="
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
),
)
def statvfs_called():
assert self.neon_env.pageserver.log_contains(".*statvfs_ldpreload status:.*" + magic)
assert self.neon_env.pageserver.log_contains(".*running mocked statvfs.*" + magic)
wait_until(10, 1, statvfs_called)
@@ -431,18 +435,17 @@ def poor_mans_du(
return (total_on_disk, largest_layer, smallest_layer or 0)
@pytest.mark.skipif(
not sys.platform.startswith("linux"), reason="LD_PRELOAD hack currently only supported on Linux"
)
def test_statvfs_error_handling(eviction_env: EvictionEnv):
"""
We should log an error that statvfs fails.
"""
env = eviction_env
env.neon_env.pageserver.stop()
env.pageserver_start_with_mocked_statvfs(
'disk_usage_based_eviction={ period = "1s", max_usage_pct = 90, min_avail_bytes = 0 }',
{
env.pageserver_start_with_disk_usage_eviction(
period="1s",
max_usage_pct=90,
min_avail_bytes=0,
mock_behavior={
"type": "Failure",
"mocked_error": "EIO",
},
@@ -452,9 +455,6 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv):
env.neon_env.pageserver.allowed_errors.append(".*statvfs failed.*EIO")
@pytest.mark.skipif(
not sys.platform.startswith("linux"), reason="LD_PRELOAD hack currently only supported on Linux"
)
def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
"""
If statvfs data shows 100% usage, the eviction task will drive it down to
@@ -469,18 +469,15 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
blocksize = 512
total_blocks = (total_size + (blocksize - 1)) // blocksize
env.pageserver_start_with_mocked_statvfs(
'disk_usage_based_eviction={ period = "1s", max_usage_pct = 33, min_avail_bytes = 0 }',
{
env.pageserver_start_with_disk_usage_eviction(
period="1s",
max_usage_pct=33,
min_avail_bytes=0,
mock_behavior={
"type": "Success",
"blocksize": blocksize,
"total_blocks": total_blocks,
"used": {
"type": "WalkDir",
"path": str((env.neon_env.repo_dir / "tenants").absolute()),
# timelines_du() only reports layer files, do the same here
"name_filter": ".*__.*",
},
"name_filter": ".*__.*",
},
)
@@ -494,9 +491,6 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage"
@pytest.mark.skipif(
not sys.platform.startswith("linux"), reason="LD_PRELOAD hack currently only supported on Linux"
)
def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
"""
If statvfs data shows 100% usage, the eviction task will drive it down to
@@ -513,19 +507,15 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
min_avail_bytes = total_size // 3
env.pageserver_start_with_mocked_statvfs(
'disk_usage_based_eviction={ period = "1s", max_usage_pct = 100, min_avail_bytes = %s }'
% (min_avail_bytes),
{
env.pageserver_start_with_disk_usage_eviction(
period="1s",
max_usage_pct=100,
min_avail_bytes=min_avail_bytes,
mock_behavior={
"type": "Success",
"blocksize": blocksize,
"total_blocks": total_blocks,
"used": {
"type": "WalkDir",
"path": str((env.neon_env.repo_dir / "tenants").absolute()),
# timelines_du() only reports layer files, do the same here
"name_filter": ".*__.*",
},
"name_filter": ".*__.*",
},
)