Compare commits

..

2 Commits

Author SHA1 Message Date
Bojan Serafimov
b1de46c18d wip 2023-11-01 20:50:20 -04:00
Bojan Serafimov
88064d8c1d wip 2023-11-01 17:13:56 -04:00
25 changed files with 474 additions and 420 deletions

View File

@@ -283,6 +283,7 @@ fn main() -> Result<()> {
.expect("--vm-monitor-addr should always be set because it has a default arg");
let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
let cgroup = matches.get_one::<String>("cgroup");
let file_cache_on_disk = matches.get_flag("file-cache-on-disk");
// Only make a runtime if we need to.
// Note: it seems like you can make a runtime in an inner scope and
@@ -309,6 +310,7 @@ fn main() -> Result<()> {
cgroup: cgroup.cloned(),
pgconnstr: file_cache_connstr.cloned(),
addr: vm_monitor_addr.clone(),
file_cache_on_disk,
})),
token.clone(),
))
@@ -480,8 +482,6 @@ fn cli() -> clap::Command {
.value_name("FILECACHE_CONNSTR"),
)
.arg(
// DEPRECATED, NO LONGER DOES ANYTHING.
// See https://github.com/neondatabase/cloud/issues/7516
Arg::new("file-cache-on-disk")
.long("file-cache-on-disk")
.action(clap::ArgAction::SetTrue),

View File

@@ -1,7 +1,4 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, MutexGuard,
};
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
@@ -13,7 +10,6 @@ use tokio::sync::Semaphore;
/// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit
pub struct OnceCell<T> {
inner: Mutex<Inner<T>>,
initializers: AtomicUsize,
}
impl<T> Default for OnceCell<T> {
@@ -21,7 +17,6 @@ impl<T> Default for OnceCell<T> {
fn default() -> Self {
Self {
inner: Default::default(),
initializers: AtomicUsize::new(0),
}
}
}
@@ -54,7 +49,6 @@ impl<T> OnceCell<T> {
init_semaphore: Arc::new(sem),
value: Some(value),
}),
initializers: AtomicUsize::new(0),
}
}
@@ -66,8 +60,8 @@ impl<T> OnceCell<T> {
/// Initialization is panic-safe and cancellation-safe.
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<Guard<'_, T>, E>
where
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let sem = {
let guard = self.inner.lock().unwrap();
@@ -77,61 +71,29 @@ impl<T> OnceCell<T> {
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
let permit = sem.acquire_owned().await;
if permit.is_err() {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
} else {
// now we try
let value = factory().await?;
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.lock().unwrap();
Ok(Self::set0(value, guard))
}
Err(_closed) => {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
}
let mut guard = self.inner.lock().unwrap();
assert!(
guard.value.is_none(),
"we won permit, must not be initialized"
);
guard.value = Some(value);
guard.init_semaphore.close();
Ok(Guard(guard))
}
}
/// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used
/// to complete initializing the inner value.
///
/// # Panics
///
/// If the inner has already been initialized.
pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> {
let guard = self.inner.lock().unwrap();
// cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot
// give more permits right now.
if guard.init_semaphore.try_acquire().is_ok() {
drop(guard);
panic!("permit is of wrong origin");
}
Self::set0(value, guard)
}
fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner<T>>) -> Guard<'_, T> {
if guard.value.is_some() {
drop(guard);
unreachable!("we won permit, must not be initialized");
}
guard.value = Some(value);
guard.init_semaphore.close();
Guard(guard)
}
/// Returns a guard to an existing initialized value, if any.
pub fn get(&self) -> Option<Guard<'_, T>> {
let guard = self.inner.lock().unwrap();
@@ -141,28 +103,6 @@ impl<T> OnceCell<T> {
None
}
}
/// Return the number of [`Self::get_or_init`] calls waiting for initialization to complete.
pub fn initializer_count(&self) -> usize {
self.initializers.load(Ordering::Relaxed)
}
}
/// DropGuard counter for queued tasks waiting to initialize, mainly accessible for the
/// initializing task for example at the end of initialization.
struct CountWaitingInitializers<'a, T>(&'a OnceCell<T>);
impl<'a, T> CountWaitingInitializers<'a, T> {
fn start(target: &'a OnceCell<T>) -> Self {
target.initializers.fetch_add(1, Ordering::Relaxed);
CountWaitingInitializers(target)
}
}
impl<'a, T> Drop for CountWaitingInitializers<'a, T> {
fn drop(&mut self) {
self.0.initializers.fetch_sub(1, Ordering::Relaxed);
}
}
/// Uninteresting guard object to allow short-lived access to inspect or clone the held,
@@ -195,7 +135,7 @@ impl<'a, T> Guard<'a, T> {
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
@@ -205,14 +145,11 @@ impl<'a, T> Guard<'a, T> {
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, InitPermit(permit)))
.map(|v| (v, permit))
.expect("guard is not created unless value has been initialized")
}
}
/// Type held by OnceCell (de)initializing task.
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
#[cfg(test)]
mod tests {
use super::*;
@@ -248,11 +185,11 @@ mod tests {
barrier.wait().await;
let won = {
let g = cell
.get_or_init(|permit| {
.get_or_init(|| {
counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed);
async {
counters.future_polled.fetch_add(1, Ordering::Relaxed);
Ok::<_, Infallible>((i, permit))
Ok::<_, Infallible>(i)
}
})
.await
@@ -306,7 +243,7 @@ mod tests {
deinitialization_started.wait().await;
let started_at = tokio::time::Instant::now();
cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) })
cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) })
.await
.unwrap();
@@ -321,32 +258,18 @@ mod tests {
assert_eq!(*cell.get().unwrap(), reinit);
}
#[test]
fn reinit_with_deinit_permit() {
let cell = Arc::new(OnceCell::new(42));
let (mol, permit) = cell.get().unwrap().take_and_deinit();
cell.set(5, permit);
assert_eq!(*cell.get().unwrap(), 5);
let (five, permit) = cell.get().unwrap().take_and_deinit();
assert_eq!(5, five);
cell.set(mol, permit);
assert_eq!(*cell.get().unwrap(), 42);
}
#[tokio::test]
async fn initialization_attemptable_until_ok() {
let cell = OnceCell::default();
for _ in 0..10 {
cell.get_or_init(|_permit| async { Err("whatever error") })
cell.get_or_init(|| async { Err("whatever error") })
.await
.unwrap_err();
}
let g = cell
.get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) })
.get_or_init(|| async { Ok::<_, Infallible>("finally success") })
.await
.unwrap();
assert_eq!(*g, "finally success");
@@ -358,11 +281,11 @@ mod tests {
let barrier = tokio::sync::Barrier::new(2);
let initializer = cell.get_or_init(|permit| async {
let initializer = cell.get_or_init(|| async {
barrier.wait().await;
futures::future::pending::<()>().await;
Ok::<_, Infallible>(("never reached", permit))
Ok::<_, Infallible>("never reached")
});
tokio::select! {
@@ -375,7 +298,7 @@ mod tests {
assert!(cell.get().is_none());
let g = cell
.get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) })
.get_or_init(|| async { Ok::<_, Infallible>("now initialized") })
.await
.unwrap();
assert_eq!(*g, "now initialized");

View File

@@ -21,6 +21,11 @@ pub struct FileCacheState {
#[derive(Debug)]
pub struct FileCacheConfig {
/// Whether the file cache is *actually* stored in memory (e.g. by writing to
/// a tmpfs or shmem file). If true, the size of the file cache will be counted against the
/// memory available for the cgroup.
pub(crate) in_memory: bool,
/// The size of the file cache, in terms of the size of the resource it consumes
/// (currently: only memory)
///
@@ -54,9 +59,22 @@ pub struct FileCacheConfig {
spread_factor: f64,
}
impl Default for FileCacheConfig {
fn default() -> Self {
impl FileCacheConfig {
pub fn default_in_memory() -> Self {
Self {
in_memory: true,
// 75 %
resource_multiplier: 0.75,
// 640 MiB; (512 + 128)
min_remaining_after_cache: NonZeroU64::new(640 * MiB).unwrap(),
// ensure any increase in file cache size is split 90-10 with 10% to other memory
spread_factor: 0.1,
}
}
pub fn default_on_disk() -> Self {
Self {
in_memory: false,
resource_multiplier: 0.75,
// 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
// memory, the kernel will just evict from its page cache, rather than e.g. killing
@@ -65,9 +83,7 @@ impl Default for FileCacheConfig {
spread_factor: 0.1,
}
}
}
impl FileCacheConfig {
/// Make sure fields of the config are consistent.
pub fn validate(&self) -> anyhow::Result<()> {
// Single field validity

View File

@@ -39,6 +39,16 @@ pub struct Args {
#[arg(short, long)]
pub pgconnstr: Option<String>,
/// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
/// kernel's page cache), and therefore should not count against available memory.
//
// NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
// than a roundabout way, via whether it's on disk), but in order to be backwards compatible
// during the switch away from an in-memory file cache, we had to default to the previous
// behavior.
#[arg(long)]
pub file_cache_on_disk: bool,
/// The address we should listen on for connection requests. For the
/// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
#[arg(short, long)]

View File

@@ -156,7 +156,10 @@ impl Runner {
// memory limits.
if let Some(connstr) = &args.pgconnstr {
info!("initializing file cache");
let config = FileCacheConfig::default();
let config = match args.file_cache_on_disk {
true => FileCacheConfig::default_on_disk(),
false => FileCacheConfig::default_in_memory(),
};
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
.await
@@ -184,7 +187,10 @@ impl Runner {
info!("file cache size actually got set to {actual_size}")
}
file_cache_disk_size = actual_size;
if args.file_cache_on_disk {
file_cache_disk_size = actual_size;
}
state.filecache = Some(file_cache);
}
@@ -233,11 +239,17 @@ impl Runner {
let requested_mem = target.mem;
let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
let expected_file_cache_size = self
let (expected_file_cache_size, expected_file_cache_disk_size) = self
.filecache
.as_ref()
.map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
.unwrap_or(0);
.map(|file_cache| {
let size = file_cache.config.calculate_cache_size(usable_system_memory);
match file_cache.config.in_memory {
true => (size, 0),
false => (size, size),
}
})
.unwrap_or((0, 0));
if let Some(cgroup) = &self.cgroup {
let (last_time, last_history) = *cgroup.watcher.borrow();
@@ -261,7 +273,7 @@ impl Runner {
let new_threshold = self
.config
.cgroup_threshold(usable_system_memory, expected_file_cache_size);
.cgroup_threshold(usable_system_memory, expected_file_cache_disk_size);
let current = last_history.avg_non_reclaimable;
@@ -288,10 +300,13 @@ impl Runner {
.set_file_cache_size(expected_file_cache_size)
.await
.context("failed to set file cache size")?;
file_cache_disk_size = actual_usage;
if !file_cache.config.in_memory {
file_cache_disk_size = actual_usage;
}
let message = format!(
"set file cache size to {} MiB",
"set file cache size to {} MiB (in memory = {})",
bytes_to_mebibytes(actual_usage),
file_cache.config.in_memory,
);
info!("downscale: {message}");
status.push(message);
@@ -342,7 +357,9 @@ impl Runner {
.set_file_cache_size(expected_usage)
.await
.context("failed to set file cache size")?;
file_cache_disk_size = actual_usage;
if !file_cache.config.in_memory {
file_cache_disk_size = actual_usage;
}
if actual_usage != expected_usage {
warn!(

View File

@@ -88,6 +88,10 @@ criterion.workspace = true
hex-literal.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
[[bench]]
name = "bench_writes"
harness = false
[[bench]]
name = "bench_layer_map"
harness = false

View File

@@ -10,3 +10,7 @@ To run a specific file:
To run a specific function:
`cargo bench --bench bench_layer_map -- real_map_uniform_queries`
To add a new benchmark:
1. Create new file containing `criterion_main!`
2. Add it to `Cargo.toml`

View File

@@ -0,0 +1,76 @@
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pageserver::{tenant::storage_layer::InMemoryLayer, config::PageServerConf, context::{RequestContext, DownloadBehavior}, task_mgr::TaskKind, repository::Key, virtual_file};
use pageserver::repository::Value;
use utils::{id::{TimelineId, TenantId}, lsn::Lsn};
fn bench_writes(c: &mut Criterion) {
// Boilerplate
// TODO this setup can be avoided if I reuse TenantHarness but it's difficult
// because it's only compiled for tests, and it's hacky because tbh we
// shouldn't need this many inputs for a function that just writes bytes
// from memory to disk. Performance-critical functions should be
// self-contained (almost like they're separate libraries) and all the
// monolithic pageserver machinery should live outside.
virtual_file::init(10);
let repo_dir = Utf8PathBuf::from(&"/home/bojan/tmp/repo_dir");
let conf = PageServerConf::dummy_conf(repo_dir);
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let timeline_id = TimelineId::generate();
let tenant_id = TenantId::generate();
let start_lsn = Lsn(0);
let ctx = RequestContext::new(TaskKind::LayerFlushTask, DownloadBehavior::Error);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
fn test_img(s: &str) -> Bytes {
let mut buf = BytesMut::new();
buf.extend_from_slice(s.as_bytes());
buf.resize(64, 0);
buf.freeze()
}
// Make the InMemoryLayer that will be flushed
let layer = rt.block_on(async {
let l = InMemoryLayer::create(&conf, timeline_id, tenant_id, start_lsn).await.unwrap();
let mut lsn = Lsn(0x10);
let mut key = Key::from_hex("012222222233333333444444445500000000").unwrap();
let mut blknum = 0;
for _ in 0..100 {
key.field6 = blknum;
let val = Value::Image(test_img(&format!("{} at {}", blknum, lsn)));
l.put_value(key, lsn, &val, &ctx).await.unwrap();
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
}
l
});
rt.block_on(async {
layer.write_to_disk_bench(&ctx).await.unwrap();
});
let mut group = c.benchmark_group("g1");
group.bench_function("f1", |b| {
b.iter(|| {
// TODO
});
});
group.bench_function("f2", |b| {
b.iter(|| {
// TODO
});
});
group.finish();
}
criterion_group!(group_1, bench_writes);
criterion_main!(group_1);

View File

@@ -10,7 +10,6 @@ use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use crate::virtual_file::MaybeFatalIo;
use crate::virtual_file::VirtualFile;
use anyhow::Context;
use camino::Utf8PathBuf;
@@ -272,9 +271,7 @@ impl DeletionHeader {
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
.await
.maybe_fatal_err("save deletion header")?;
Ok(())
.map_err(Into::into)
}
}
@@ -363,7 +360,6 @@ impl DeletionList {
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)
}
}

View File

@@ -34,8 +34,6 @@ use crate::deletion_queue::TEMP_SUFFIX;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::storage_layer::LayerFileName;
use crate::virtual_file::on_fatal_io_error;
use crate::virtual_file::MaybeFatalIo;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
@@ -197,7 +195,7 @@ impl ListWriter {
debug!("Deletion header {header_path} not found, first start?");
Ok(None)
} else {
on_fatal_io_error(&e, "reading deletion header");
Err(anyhow::anyhow!(e))
}
}
}
@@ -218,9 +216,16 @@ impl ListWriter {
self.pending.sequence = validated_sequence + 1;
let deletion_directory = self.conf.deletion_prefix();
let mut dir = tokio::fs::read_dir(&deletion_directory)
.await
.fatal_err("read deletion directory");
let mut dir = match tokio::fs::read_dir(&deletion_directory).await {
Ok(d) => d,
Err(e) => {
warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
}
};
let list_name_pattern =
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
@@ -228,7 +233,7 @@ impl ListWriter {
let temp_extension = format!(".{TEMP_SUFFIX}");
let header_path = self.conf.deletion_header_path();
let mut seqs: Vec<u64> = Vec::new();
while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
while let Some(dentry) = dir.next_entry().await? {
let file_name = dentry.file_name();
let dentry_str = file_name.to_string_lossy();
@@ -241,9 +246,11 @@ impl ListWriter {
info!("Cleaning up temporary file {dentry_str}");
let absolute_path =
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
tokio::fs::remove_file(&absolute_path)
.await
.fatal_err("delete temp file");
if let Err(e) = tokio::fs::remove_file(&absolute_path).await {
// Non-fatal error: we will just leave the file behind but not
// try and load it.
warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
}
continue;
}
@@ -283,9 +290,7 @@ impl ListWriter {
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = tokio::fs::read(&list_path)
.await
.fatal_err("read deletion list");
let list_bytes = tokio::fs::read(&list_path).await?;
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,

View File

@@ -28,7 +28,6 @@ use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::MaybeFatalIo;
use super::deleter::DeleterMessage;
use super::DeletionHeader;
@@ -288,9 +287,16 @@ where
async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
for list_path in list_paths {
debug!("Removing deletion list {list_path}");
tokio::fs::remove_file(&list_path)
.await
.fatal_err("remove deletion list");
if let Err(e) = tokio::fs::remove_file(&list_path).await {
// Unexpected: we should have permissions and nothing else should
// be touching these files. We will leave the file behind. Subsequent
// pageservers will try and load it again: hopefully whatever storage
// issue (probably permissions) has been fixed by then.
tracing::error!("Failed to delete {list_path}: {e:#}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
break;
}
}
}

View File

@@ -3340,7 +3340,6 @@ fn run_initdb(
.args(["-D", initdb_target_dir.as_ref()])
.args(["-U", &conf.superuser])
.args(["-E", "utf8"])
.args(["-l", "logical"])
.arg("--no-instructions")
// This is only used for a temporary installation that is deleted shortly after,
// so no need to fsync it

View File

@@ -4,6 +4,7 @@ pub mod delta_layer;
mod filename;
mod image_layer;
mod inmemory_layer;
mod inmemory_layer_raw;
mod layer;
mod layer_desc;

View File

@@ -367,4 +367,61 @@ impl InMemoryLayer {
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
Ok(delta_layer)
}
/// Write this frozen in-memory layer to disk.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub async fn write_to_disk_bench(
&self,
ctx: &RequestContext,
) -> Result<()> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
// though: another thread might have grabbed a reference to this layer
// in `get_layer_for_write' just before the checkpointer called
// `freeze`, and then `write_to_disk` on it. When the thread gets the
// lock, it will see that it's not writeable anymore and retry, but it
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().await;
let end_lsn = *self.end_lsn.get().unwrap();
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
Key::MIN,
self.start_lsn..end_lsn,
)
.await?;
let mut buf = Vec::new();
let cursor = inner.file.block_cursor();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
for (key, vec_map) in keys.iter() {
let key = **key;
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer
.put_value_bytes(key, *lsn, &buf, will_init)
.await?;
}
}
// MAX is used here because we identify L0 layers by full key range
// TODO XXX do this
// let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
Ok(())
}
}

View File

@@ -0,0 +1,23 @@
pub struct InMemoryLayerRaw {
}
impl InMemoryLayerRaw {
pub async fn new() -> Self {
Self {
}
}
pub async fn put_value(
&self,
key: Key,
lsn: Lsn,
val: &Value,
ctx: &RequestContext,
) -> Result<()> {
Ok(())
}
}

View File

@@ -337,39 +337,31 @@ enum ResidentOrWantedEvicted {
}
impl ResidentOrWantedEvicted {
fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
fn get(&self) -> Option<Arc<DownloadedLayer>> {
match self {
ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
Some(strong) => {
LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
*self = ResidentOrWantedEvicted::Resident(strong.clone());
Some((strong, true))
Some(strong)
}
None => None,
},
}
}
/// When eviction is first requested, drop down to holding a [`Weak`].
///
/// Returns `Some` if this was the first time eviction was requested. Care should be taken to
/// drop the possibly last strong reference outside of the mutex of
/// heavier_once_cell::OnceCell.
fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
/// Returns `true` if this was the first time eviction was requested.
fn downgrade(&mut self) -> bool {
match self {
ResidentOrWantedEvicted::Resident(strong) => {
let weak = Arc::downgrade(strong);
let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
std::mem::swap(self, &mut temp);
match temp {
ResidentOrWantedEvicted::Resident(strong) => Some(strong),
ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
}
*self = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
// returning the weak is not useful, because the drop could had already ran with
// the replacement above, and that will take care of cleaning the Option we are in
true
}
ResidentOrWantedEvicted::WantedEvicted(..) => None,
ResidentOrWantedEvicted::WantedEvicted(..) => false,
}
}
}
@@ -411,10 +403,6 @@ struct LayerInner {
version: AtomicUsize,
/// Allow subscribing to when the layer actually gets evicted.
///
/// If in future we need to implement "wait until layer instances are gone and done", carrying
/// this over to the gc spawn_blocking from LayerInner::drop will do the trick, and adding a
/// method for "wait_gc" which will wait to this being closed.
status: tokio::sync::broadcast::Sender<Status>,
/// Counter for exponential backoff with the download
@@ -565,8 +553,6 @@ impl LayerInner {
}
}
/// Cancellation safe, however dropping the future and calling this method again might result
/// in a new attempt to evict OR join the previously started attempt.
pub(crate) async fn evict_and_wait(
&self,
_: &RemoteTimelineClient,
@@ -577,22 +563,20 @@ impl LayerInner {
let mut rx = self.status.subscribe();
let strong = {
match self.inner.get() {
Some(mut either) => {
self.wanted_evicted.store(true, Ordering::Relaxed);
either.downgrade()
}
None => return Err(EvictionError::NotFound),
}
};
let res =
self.wanted_evicted
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
if strong.is_some() {
// drop the DownloadedLayer outside of the holding the guard
drop(strong);
if res.is_ok() {
LAYER_IMPL_METRICS.inc_started_evictions();
}
if self.get().is_none() {
// it was not evictable in the first place
// our store to the wanted_evicted does not matter; it will be reset by next download
return Err(EvictionError::NotFound);
}
match rx.recv().await {
Ok(Status::Evicted) => Ok(()),
Ok(Status::Downloaded) => Err(EvictionError::Downloaded),
@@ -606,8 +590,7 @@ impl LayerInner {
//
// use however late (compared to the initial expressing of wanted) as the
// "outcome" now
LAYER_IMPL_METRICS.inc_broadcast_lagged();
match self.inner.get() {
match self.get() {
Some(_) => Err(EvictionError::Downloaded),
None => Ok(()),
}
@@ -615,17 +598,15 @@ impl LayerInner {
}
}
/// Cancellation safe.
#[tracing::instrument(skip_all, fields(layer=%self))]
/// Should be cancellation safe, but cancellation is troublesome together with the spawned
/// download.
async fn get_or_maybe_download(
self: &Arc<Self>,
allow_download: bool,
ctx: Option<&RequestContext>,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
let mut init_permit = None;
loop {
let download = move |permit| async move {
let download = move || async move {
// disable any scheduled but not yet running eviction deletions for this
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
@@ -646,11 +627,7 @@ impl LayerInner {
.await
.map_err(DownloadError::PreStatFailed)?;
let permit = if let Some(reason) = needs_download {
if let NeedsDownload::NotFile(ft) = reason {
return Err(DownloadError::NotFile(ft));
}
if let Some(reason) = needs_download {
// only reset this after we've decided we really need to download. otherwise it'd
// be impossible to mark cancelled downloads for eviction, like one could imagine
// we would like to do for prefetching which was not needed.
@@ -660,6 +637,8 @@ impl LayerInner {
return Err(DownloadError::NoRemoteStorage);
}
tracing::debug!(%reason, "downloading layer");
if let Some(ctx) = ctx {
self.check_expected_download(ctx)?;
}
@@ -670,16 +649,12 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
tracing::info!(%reason, "downloading on-demand");
self.spawn_download_and_wait(timeline, permit).await?
self.spawn_download_and_wait(timeline).await?;
} else {
// the file is present locally, probably by a previous but cancelled call to
// get_or_maybe_download. alternatively we might be running without remote storage.
LAYER_IMPL_METRICS.inc_init_needed_no_download();
permit
};
}
let res = Arc::new(DownloadedLayer {
owner: Arc::downgrade(self),
@@ -692,60 +667,19 @@ impl LayerInner {
LayerResidenceEventReason::ResidenceChange,
);
let waiters = self.inner.initializer_count();
if waiters > 0 {
tracing::info!(waiters, "completing the on-demand download for other tasks");
}
Ok((ResidentOrWantedEvicted::Resident(res), permit))
Ok(ResidentOrWantedEvicted::Resident(res))
};
if let Some(init_permit) = init_permit.take() {
// use the already held initialization permit because it is impossible to hit the
// below paths anymore essentially limiting the max loop iterations to 2.
let (value, init_permit) = download(init_permit).await?;
let mut guard = self.inner.set(value, init_permit);
let (strong, _upgraded) = guard
.get_and_upgrade()
.expect("init creates strong reference, we held the init permit");
let locked = self.inner.get_or_init(download).await?;
if let Some(strong) = Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted)
{
return Ok(strong);
}
let (weak, permit) = {
let mut locked = self.inner.get_or_init(download).await?;
if let Some((strong, upgraded)) = locked.get_and_upgrade() {
if upgraded {
// when upgraded back, the Arc<DownloadedLayer> is still available, but
// previously a `evict_and_wait` was received.
self.wanted_evicted.store(false, Ordering::Relaxed);
// error out any `evict_and_wait`
drop(self.status.send(Status::Downloaded));
LAYER_IMPL_METRICS
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
}
return Ok(strong);
} else {
// path to here: the evict_blocking is stuck on spawn_blocking queue.
//
// reset the contents, deactivating the eviction and causing a
// EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed.
locked.take_and_deinit()
}
};
// unlock first, then drop the weak, but because upgrade failed, we
// know it cannot be a problem.
assert!(
matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
"unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
);
init_permit = Some(permit);
// the situation in which we might need to retry is that our init was ready
// immediatedly, but the DownloadedLayer had been dropped BUT failed to complete
// Self::evict_blocking
LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download();
}
}
@@ -757,8 +691,8 @@ impl LayerInner {
match b {
Download => Ok(()),
Warn | Error => {
tracing::info!(
"unexpectedly on-demand downloading for task kind {:?}",
tracing::warn!(
"unexpectedly on-demand downloading remote layer {self} for task kind {:?}",
ctx.task_kind()
);
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
@@ -780,17 +714,14 @@ impl LayerInner {
async fn spawn_download_and_wait(
self: &Arc<Self>,
timeline: Arc<Timeline>,
permit: heavier_once_cell::InitPermit,
) -> Result<heavier_once_cell::InitPermit, DownloadError> {
) -> Result<(), DownloadError> {
let task_name = format!("download layer {}", self);
let (tx, rx) = tokio::sync::oneshot::channel();
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.
let this: Arc<Self> = self.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
crate::task_mgr::TaskKind::RemoteDownloadTask,
@@ -799,7 +730,6 @@ impl LayerInner {
&task_name,
false,
async move {
let client = timeline
.remote_client
.as_ref()
@@ -821,9 +751,9 @@ impl LayerInner {
}
};
if let Err(res) = tx.send((result, permit)) {
if let Err(res) = tx.send(result) {
match res {
(Ok(()), _) => {
Ok(()) => {
// our caller is cancellation safe so this is fine; if someone
// else requests the layer, they'll find it already downloaded
// or redownload.
@@ -834,7 +764,7 @@ impl LayerInner {
tracing::info!("layer file download completed after requester had cancelled");
LAYER_IMPL_METRICS.inc_download_completed_without_requester();
},
(Err(e), _) => {
Err(e) => {
// our caller is cancellation safe, but we might be racing with
// another attempt to initialize. before we have cancellation
// token support: these attempts should converge regardless of
@@ -850,7 +780,7 @@ impl LayerInner {
.in_current_span(),
);
match rx.await {
Ok((Ok(()), permit)) => {
Ok(Ok(())) => {
if let Some(reason) = self
.needs_download()
.await
@@ -861,12 +791,10 @@ impl LayerInner {
}
self.consecutive_failures.store(0, Ordering::Relaxed);
tracing::info!("on-demand download successful");
Ok(permit)
Ok(())
}
Ok((Err(e), _permit)) => {
// FIXME: this should be with the spawned task and be cancellation sensitive
Ok(Err(e)) => {
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
@@ -884,6 +812,33 @@ impl LayerInner {
}
}
/// Access the current state without waiting for the file to be downloaded.
///
/// Requires that we've initialized to state which is respective to the
/// actual residency state.
fn get(&self) -> Option<Arc<DownloadedLayer>> {
let locked = self.inner.get();
Self::get_or_apply_evictedness(locked, &self.wanted_evicted)
}
fn get_or_apply_evictedness(
guard: Option<heavier_once_cell::Guard<'_, ResidentOrWantedEvicted>>,
wanted_evicted: &AtomicBool,
) -> Option<Arc<DownloadedLayer>> {
if let Some(mut x) = guard {
if let Some(won) = x.get() {
// there are no guarantees that we will always get to observe a concurrent call
// to evict
if wanted_evicted.load(Ordering::Acquire) {
x.downgrade();
}
return Some(won);
}
}
None
}
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match tokio::fs::metadata(&self.path).await {
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
@@ -903,7 +858,7 @@ impl LayerInner {
fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
// in future, this should include sha2-256 validation of the file.
if !m.is_file() {
Err(NeedsDownload::NotFile(m.file_type()))
Err(NeedsDownload::NotFile)
} else if m.len() != self.desc.file_size {
Err(NeedsDownload::WrongSize {
actual: m.len(),
@@ -917,9 +872,7 @@ impl LayerInner {
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
// this is not accurate: we could have the file locally but there was a cancellation
// and now we are not in sync, or we are currently downloading it.
let remote = self.inner.get().is_none();
let remote = self.get().is_none();
let access_stats = self.access_stats.as_api_model(reset);
@@ -1054,14 +1007,11 @@ impl LayerInner {
Ok(())
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::error!(
layer_size = %self.desc.file_size,
"failed to evict layer from disk, it was already gone (metrics will be inaccurate)"
);
tracing::info!("failed to evict file from disk, it was already gone");
Err(EvictionCancelled::FileNotFound)
}
Err(e) => {
tracing::error!("failed to evict file from disk: {e:#}");
tracing::warn!("failed to evict file from disk: {e:#}");
Err(EvictionCancelled::RemoveFailed)
}
};
@@ -1105,8 +1055,6 @@ enum DownloadError {
ContextAndConfigReallyDeniesDownloads,
#[error("downloading is really required but not allowed by this method")]
DownloadRequired,
#[error("layer path exists, but it is not a file: {0:?}")]
NotFile(std::fs::FileType),
/// Why no error here? Because it will be reported by page_service. We should had also done
/// retries already.
#[error("downloading evicted layer file failed")]
@@ -1122,7 +1070,7 @@ enum DownloadError {
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,
NotFile(std::fs::FileType),
NotFile,
WrongSize { actual: u64, expected: u64 },
}
@@ -1130,7 +1078,7 @@ impl std::fmt::Display for NeedsDownload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NeedsDownload::NotFound => write!(f, "file was not found"),
NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
NeedsDownload::NotFile => write!(f, "path is not a file"),
NeedsDownload::WrongSize { actual, expected } => {
write!(f, "file size mismatch {actual} vs. {expected}")
}
@@ -1141,8 +1089,6 @@ impl std::fmt::Display for NeedsDownload {
/// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
pub(crate) struct DownloadedLayer {
owner: Weak<LayerInner>,
// Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
// DownloadedLayer
kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
version: usize,
}
@@ -1186,6 +1132,7 @@ impl DownloadedLayer {
"these are the same, just avoiding the upgrade"
);
// there is nothing async here, but it should be async
let res = if owner.desc.is_delta {
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_id,
@@ -1284,8 +1231,6 @@ impl std::fmt::Debug for ResidentLayer {
impl ResidentLayer {
/// Release the eviction guard, converting back into a plain [`Layer`].
///
/// You can access the [`Layer`] also by using `as_ref`.
pub(crate) fn drop_eviction_guard(self) -> Layer {
self.into()
}
@@ -1341,7 +1286,7 @@ impl AsRef<Layer> for ResidentLayer {
}
}
/// Drop the eviction guard.
/// Allow slimming down if we don't want the `2*usize` with eviction candidates?
impl From<ResidentLayer> for Layer {
fn from(value: ResidentLayer) -> Self {
value.owner
@@ -1511,13 +1456,6 @@ impl LayerImplMetrics {
.unwrap()
.inc();
}
fn inc_broadcast_lagged(&self) {
self.rare_counters
.get_metric_with_label_values(&["broadcast_lagged"])
.unwrap()
.inc();
}
}
enum EvictionCancelled {
@@ -1529,8 +1467,6 @@ enum EvictionCancelled {
AlreadyReinitialized,
/// Not evicted because of a pending reinitialization
LostToDownload,
/// After eviction, there was a new layer access which cancelled the eviction.
UpgradedBackOnAccess,
}
impl EvictionCancelled {
@@ -1543,7 +1479,6 @@ impl EvictionCancelled {
EvictionCancelled::RemoveFailed => "remove_failed",
EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
EvictionCancelled::LostToDownload => "lost_to_download",
EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
}
}
}

View File

@@ -19,7 +19,6 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use utils::fs_ext;
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -174,78 +173,37 @@ impl OpenFiles {
}
}
/// Identify error types that should alwways terminate the process. Other
/// error types may be elegible for retry.
pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
use nix::errno::Errno::*;
match e.raw_os_error().map(nix::errno::from_i32) {
Some(EIO) => {
// Terminate on EIO because we no longer trust the device to store
// data safely, or to uphold persistence guarantees on fsync.
true
}
Some(EROFS) => {
// Terminate on EROFS because a filesystem is usually remounted
// readonly when it has experienced some critical issue, so the same
// logic as EIO applies.
true
}
Some(EACCES) => {
// Terminate on EACCESS because we should always have permissions
// for our own data dir: if we don't, then we can't do our job and
// need administrative intervention to fix permissions. Terminating
// is the best way to make sure we stop cleanly rather than going
// into infinite retry loops, and will make it clear to the outside
// world that we need help.
true
}
_ => {
// Treat all other local file I/O errors are retryable. This includes:
// - ENOSPC: we stay up and wait for eviction to free some space
// - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
// - WriteZero, Interrupted: these are used internally VirtualFile
false
}
}
#[derive(Debug, thiserror::Error)]
pub enum CrashsafeOverwriteError {
#[error("final path has no parent dir")]
FinalPathHasNoParentDir,
#[error("remove tempfile")]
RemovePreviousTempfile(#[source] std::io::Error),
#[error("create tempfile")]
CreateTempfile(#[source] std::io::Error),
#[error("write tempfile")]
WriteContents(#[source] std::io::Error),
#[error("sync tempfile")]
SyncTempfile(#[source] std::io::Error),
#[error("rename tempfile to final path")]
RenameTempfileToFinalPath(#[source] std::io::Error),
#[error("open final path parent dir")]
OpenFinalPathParentDir(#[source] std::io::Error),
#[error("sync final path parent dir")]
SyncFinalPathParentDir(#[source] std::io::Error),
}
/// Call this when the local filesystem gives us an error with an external
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
/// bad storage or bad configuration, and we can't fix that from inside
/// a running process.
pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
tracing::error!("Fatal I/O error: {e}: {context})");
std::process::abort();
}
pub(crate) trait MaybeFatalIo<T> {
fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
fn fatal_err(self, context: &str) -> T;
}
impl<T> MaybeFatalIo<T> for std::io::Result<T> {
/// Terminate the process if the result is an error of a fatal type, else pass it through
///
/// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
/// not on ENOSPC.
fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
if let Err(e) = &self {
if is_fatal_io_error(e) {
on_fatal_io_error(e, context);
}
}
self
}
/// Terminate the process on any I/O error.
///
/// This is appropriate for reads on files that we know exist: they should always work.
fn fatal_err(self, context: &str) -> T {
impl CrashsafeOverwriteError {
/// Returns true iff the new contents are durably stored.
pub fn are_new_contents_durable(&self) -> bool {
match self {
Ok(v) => v,
Err(e) => {
on_fatal_io_error(&e, context);
}
Self::FinalPathHasNoParentDir => false,
Self::RemovePreviousTempfile(_) => false,
Self::CreateTempfile(_) => false,
Self::WriteContents(_) => false,
Self::SyncTempfile(_) => false,
Self::RenameTempfileToFinalPath(_) => false,
Self::OpenFinalPathParentDir(_) => false,
Self::SyncFinalPathParentDir(_) => true,
}
}
}
@@ -326,13 +284,15 @@ impl VirtualFile {
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> std::io::Result<()> {
) -> Result<(), CrashsafeOverwriteError> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
};
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
match std::fs::remove_file(tmp_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
}
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
@@ -341,20 +301,31 @@ impl VirtualFile {
// we bail out instead of causing damage.
.create_new(true),
)
.await?;
file.write_all(content).await?;
file.sync_all().await?;
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
file.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)?;
std::fs::rename(tmp_path, final_path)
.map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
final_parent_dirfd.sync_all().await?;
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
final_parent_dirfd
.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
Ok(())
}

View File

@@ -13,7 +13,6 @@ pub struct ConsoleError {
#[derive(Deserialize)]
pub struct GetRoleSecret {
pub role_secret: Box<str>,
pub allowed_ips: Option<Vec<Box<str>>>,
}
// Manually implement debug to omit sensitive info.
@@ -188,31 +187,4 @@ mod tests {
Ok(())
}
#[test]
fn parse_wake_compute() -> anyhow::Result<()> {
let json = json!({
"address": "0.0.0.0",
"aux": dummy_aux(),
});
let _: WakeCompute = serde_json::from_str(&json.to_string())?;
Ok(())
}
#[test]
fn parse_get_role_secret() -> anyhow::Result<()> {
// Empty `allowed_ips` field.
let json = json!({
"role_secret": "secret",
});
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
// Empty `allowed_ips` field.
let json = json!({
"role_secret": "secret",
"allowed_ips": ["8.8.8.8"],
});
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
Ok(())
}
}

View File

@@ -81,6 +81,7 @@ FALLBACK_DURATION = {
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-100000-100-0]": 0.55,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-0]": 12.189,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-4]": 13.899,
"test_runner/performance/test_startup.py::test_startup": 890.114,
"test_runner/performance/test_startup.py::test_startup_simple": 2.51,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_off-10-5-5]": 527.245,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_on-10-5-5]": 583.46,

View File

@@ -1,10 +1,8 @@
from contextlib import closing
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare, PgCompare
from fixtures.pageserver.utils import wait_tenant_status_404
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn
#
@@ -20,8 +18,6 @@ from fixtures.types import Lsn
def test_bulk_insert(neon_with_baseline: PgCompare):
env = neon_with_baseline
start_lsn = Lsn(env.pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0])
with closing(env.pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table huge (i int, j int);")
@@ -35,13 +31,6 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
env.report_peak_memory_use()
env.report_size()
# Report amount of wal written. Useful for comparing vanilla wal format vs
# neon wal format, measuring neon write amplification, etc.
end_lsn = Lsn(env.pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0])
wal_written_bytes = end_lsn - start_lsn
wal_written_mb = round(wal_written_bytes / (1024 * 1024))
env.zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
# When testing neon, also check how long it takes the pageserver to reingest the
# wal from safekeepers. If this number is close to total runtime, then the pageserver
# is the bottleneck.

View File

@@ -1,3 +1,6 @@
from contextlib import closing
import pytest
import requests
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -78,3 +81,49 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
# Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True)
# This test sometimes runs for longer than the global 5 minute timeout.
@pytest.mark.timeout(900)
def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
# Start
env.neon_cli.create_branch("test_startup")
with zenbenchmark.record_duration("startup_time"):
endpoint = env.endpoints.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Restart
endpoint.stop_and_destroy()
with zenbenchmark.record_duration("restart_time"):
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Fill up
num_rows = 1000000 # 30 MB
num_tables = 100
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(num_tables):
cur.execute(f"create table t_{i} (i integer);")
cur.execute(f"insert into t_{i} values (generate_series(1,{num_rows}));")
# Read
with zenbenchmark.record_duration("read_time"):
endpoint.safe_psql("select * from t_0;")
# Read again
with zenbenchmark.record_duration("second_read_time"):
endpoint.safe_psql("select * from t_0;")
# Restart
endpoint.stop_and_destroy()
with zenbenchmark.record_duration("restart_with_data"):
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Read
with zenbenchmark.record_duration("read_after_restart"):
endpoint.safe_psql("select * from t_0;")

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "825a713a4fc833a0924bf22ad34681de97c155a0",
"postgres-v15": "e45d092e4d2b09057671190a1c9d9292293cd6b7",
"postgres-v14": "1224330eee058ed6013840e2a6dace5af82150ac"
"postgres-v16": "550ffa6495a5dc62fccc3a8b449386633758680b",
"postgres-v15": "ab67ab96355d61e9d0218630be4aa7db53bf83e7",
"postgres-v14": "6669a672ee14ab2c09d44c4552f9a13fad3afc10"
}