mirror of
https://github.com/neondatabase/neon.git
synced 2026-04-30 12:50:37 +00:00
Compare commits
1 Commits
sk-skip-de
...
vm-pg-stop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bce8d4b3f5 |
@@ -820,10 +820,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
|
||||
-C pgxn/neon_utils \
|
||||
-s install && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
|
||||
-C pgxn/neon_test_utils \
|
||||
-s install && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
|
||||
-C pgxn/neon_rmgr \
|
||||
|
||||
@@ -381,6 +381,7 @@ impl Persistence {
|
||||
//
|
||||
// We create the child shards here, so that they will be available for increment_generation calls
|
||||
// if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn begin_shard_split(
|
||||
&self,
|
||||
old_shard_count: ShardCount,
|
||||
@@ -448,6 +449,7 @@ impl Persistence {
|
||||
|
||||
// When we finish shard splitting, we must atomically clean up the old shards
|
||||
// and insert the new shards, and clear the splitting marker.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn complete_shard_split(
|
||||
&self,
|
||||
split_tenant_id: TenantId,
|
||||
|
||||
@@ -115,6 +115,7 @@ pub fn set_build_info_metric(revision: &str, build_tag: &str) {
|
||||
// performed by the process.
|
||||
// We know the size of the block, so we can determine the I/O bytes out of it.
|
||||
// The value might be not 100% exact, but should be fine for Prometheus metrics in this case.
|
||||
#[allow(clippy::unnecessary_cast)]
|
||||
fn update_rusage_metrics() {
|
||||
let rusage_stats = get_rusage_stats();
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#![allow(non_snake_case)]
|
||||
// bindgen creates some unsafe code with no doc comments.
|
||||
#![allow(clippy::missing_safety_doc)]
|
||||
// noted at 1.63 that in many cases there's u32 -> u32 transmutes in bindgen code.
|
||||
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
|
||||
#![allow(clippy::useless_transmute)]
|
||||
// modules included with the postgres_ffi macro depend on the types of the specific version's
|
||||
// types, and trigger a too eager lint.
|
||||
|
||||
@@ -435,6 +435,7 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::diverging_sub_expression)]
|
||||
async fn time_travel_recover(
|
||||
&self,
|
||||
_prefix: Option<&RemotePath>,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use utils::id;
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
fs::{self, File},
|
||||
io,
|
||||
io::{self, Write},
|
||||
};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -161,6 +161,48 @@ pub async fn durable_rename(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
|
||||
///
|
||||
/// The file is first written to the specified `tmp_path`, and in a second
|
||||
/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
|
||||
/// and atomic rename guarantee that, if we crash at any point, there will never
|
||||
/// be a partially written file at `final_path` (but maybe at `tmp_path`).
|
||||
///
|
||||
/// Callers are responsible for serializing calls of this function for a given `final_path`.
|
||||
/// If they don't, there may be an error due to conflicting `tmp_path`, or there will
|
||||
/// be no error and the content of `final_path` will be the "winner" caller's `content`.
|
||||
/// I.e., the atomticity guarantees still hold.
|
||||
pub fn overwrite(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
content: &[u8],
|
||||
) -> std::io::Result<()> {
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(std::io::Error::from_raw_os_error(
|
||||
nix::errno::Errno::EINVAL as i32,
|
||||
));
|
||||
};
|
||||
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
// Use `create_new` so that, if we race with ourselves or something else,
|
||||
// we bail out instead of causing damage.
|
||||
.create_new(true)
|
||||
.open(tmp_path)?;
|
||||
file.write_all(content)?;
|
||||
file.sync_all()?;
|
||||
drop(file); // don't keep the fd open for longer than we have to
|
||||
|
||||
std::fs::rename(tmp_path, final_path)?;
|
||||
|
||||
let final_parent_dirfd = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(final_path_parent)?;
|
||||
|
||||
final_parent_dirfd.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -234,7 +234,7 @@ impl DeletionHeader {
|
||||
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
|
||||
let header_path = conf.deletion_header_path();
|
||||
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
|
||||
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
|
||||
VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
|
||||
.await
|
||||
.maybe_fatal_err("save deletion header")?;
|
||||
|
||||
@@ -325,7 +325,8 @@ impl DeletionList {
|
||||
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
|
||||
|
||||
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
|
||||
.await
|
||||
.maybe_fatal_err("save deletion list")
|
||||
.map_err(Into::into)
|
||||
@@ -834,6 +835,7 @@ mod test {
|
||||
}
|
||||
|
||||
impl ControlPlaneGenerationsApi for MockControlPlane {
|
||||
#[allow(clippy::diverging_sub_expression)] // False positive via async_trait
|
||||
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@@ -351,6 +351,7 @@ pub enum IterationOutcome<U> {
|
||||
Finished(IterationOutcomeFinished<U>),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct IterationOutcomeFinished<U> {
|
||||
/// The actual usage observed before we started the iteration.
|
||||
@@ -365,6 +366,7 @@ pub struct IterationOutcomeFinished<U> {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[allow(dead_code)]
|
||||
struct AssumedUsage<U> {
|
||||
/// The expected value for `after`, after phase 2.
|
||||
projected_after: U,
|
||||
@@ -372,12 +374,14 @@ struct AssumedUsage<U> {
|
||||
failed: LayerCount,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Serialize)]
|
||||
struct PlannedUsage<U> {
|
||||
respecting_tenant_min_resident_size: U,
|
||||
fallback_to_global_lru: Option<U>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default, Serialize)]
|
||||
struct LayerCount {
|
||||
file_sizes: u64,
|
||||
@@ -561,6 +565,7 @@ pub(crate) struct EvictionSecondaryLayer {
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum EvictionLayer {
|
||||
Attached(Layer),
|
||||
#[allow(dead_code)]
|
||||
Secondary(EvictionSecondaryLayer),
|
||||
}
|
||||
|
||||
@@ -1100,6 +1105,7 @@ mod filesystem_level_usage {
|
||||
use super::DiskUsageEvictionTaskConfig;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[allow(dead_code)]
|
||||
pub struct Usage<'a> {
|
||||
config: &'a DiskUsageEvictionTaskConfig,
|
||||
|
||||
|
||||
@@ -2214,7 +2214,7 @@ pub fn make_router(
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|
||||
|r| api_handler(r, timeline_collect_keyspace),
|
||||
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
|
||||
)
|
||||
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||
.any(handler_404))
|
||||
|
||||
@@ -30,6 +30,10 @@
|
||||
//! only a single tenant or timeline.
|
||||
//!
|
||||
|
||||
// Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
|
||||
// Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
|
||||
#![allow(clippy::declare_interior_mutable_const)]
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
@@ -308,6 +312,7 @@ struct MutableTaskState {
|
||||
}
|
||||
|
||||
struct PageServerTask {
|
||||
#[allow(dead_code)] // unused currently
|
||||
task_id: PageserverTaskId,
|
||||
|
||||
kind: TaskKind,
|
||||
|
||||
@@ -28,7 +28,6 @@ use remote_storage::GenericRemoteStorage;
|
||||
use std::fmt;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -2878,17 +2877,10 @@ impl Tenant {
|
||||
|
||||
let tenant_shard_id = *tenant_shard_id;
|
||||
let config_path = config_path.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {config_path}")
|
||||
})
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2915,17 +2907,12 @@ impl Tenant {
|
||||
|
||||
let tenant_shard_id = *tenant_shard_id;
|
||||
let target_config_path = target_config_path.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {target_config_path}")
|
||||
})
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("write tenant {tenant_shard_id} config to {target_config_path}")
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -4373,6 +4360,7 @@ mod tests {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut lsn = start_lsn;
|
||||
#[allow(non_snake_case)]
|
||||
{
|
||||
let writer = tline.writer().await;
|
||||
// Create a relation on the timeline
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::{
|
||||
pub const VALUE_SZ: usize = 5;
|
||||
pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub const PAGE_SZ: usize = 8192;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
|
||||
@@ -279,7 +279,7 @@ pub async fn save_metadata(
|
||||
let path = conf.metadata_path(tenant_shard_id, timeline_id);
|
||||
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
|
||||
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
|
||||
VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes)
|
||||
.await
|
||||
.context("write metadata")?;
|
||||
Ok(())
|
||||
|
||||
@@ -484,14 +484,9 @@ impl<'a> TenantDownloader<'a> {
|
||||
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
|
||||
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
|
||||
let heatmap_path_bg = heatmap_path.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
|
||||
})
|
||||
})
|
||||
.await
|
||||
.expect("Blocking task is never aborted")
|
||||
.maybe_fatal_err(&context_msg)?;
|
||||
VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
|
||||
.await
|
||||
.maybe_fatal_err(&context_msg)?;
|
||||
|
||||
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
|
||||
|
||||
|
||||
@@ -196,13 +196,13 @@ impl Timeline {
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default)]
|
||||
struct EvictionStats {
|
||||
candidates: usize,
|
||||
evicted: usize,
|
||||
errors: usize,
|
||||
not_evictable: usize,
|
||||
#[allow(dead_code)]
|
||||
skipped_for_shutdown: usize,
|
||||
}
|
||||
|
||||
|
||||
@@ -19,14 +19,13 @@ use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use utils::fs_ext;
|
||||
|
||||
pub use pageserver_api::models::virtual_file as api;
|
||||
pub(crate) mod io_engine;
|
||||
@@ -404,47 +403,34 @@ impl VirtualFile {
|
||||
Ok(vfile)
|
||||
}
|
||||
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion
|
||||
/// Async version of [`::utils::crashsafe::overwrite`].
|
||||
///
|
||||
/// The file is first written to the specified tmp_path, and in a second
|
||||
/// step, the tmp path is renamed to the final path. As renames are
|
||||
/// atomic, a crash during the write operation will never leave behind a
|
||||
/// partially written file.
|
||||
pub async fn crashsafe_overwrite<B: BoundedBuf>(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
/// # NB:
|
||||
///
|
||||
/// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
|
||||
/// it did at an earlier time.
|
||||
/// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
|
||||
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
|
||||
final_path: Utf8PathBuf,
|
||||
tmp_path: Utf8PathBuf,
|
||||
content: B,
|
||||
) -> std::io::Result<()> {
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(std::io::Error::from_raw_os_error(
|
||||
nix::errno::Errno::EINVAL as i32,
|
||||
));
|
||||
};
|
||||
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
|
||||
let mut file = Self::open_with_options(
|
||||
tmp_path,
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
// Use `create_new` so that, if we race with ourselves or something else,
|
||||
// we bail out instead of causing damage.
|
||||
.create_new(true),
|
||||
)
|
||||
.await?;
|
||||
let (_content, res) = file.write_all(content).await;
|
||||
res?;
|
||||
file.sync_all().await?;
|
||||
drop(file); // before the rename, that's important!
|
||||
// renames are atomic
|
||||
std::fs::rename(tmp_path, final_path)?;
|
||||
// Only open final path parent dirfd now, so that this operation only
|
||||
// ever holds one VirtualFile fd at a time. That's important because
|
||||
// the current `find_victim_slot` impl might pick the same slot for both
|
||||
// VirtualFile., and it eventually does a blocking write lock instead of
|
||||
// try_lock.
|
||||
let final_parent_dirfd =
|
||||
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
|
||||
final_parent_dirfd.sync_all().await?;
|
||||
Ok(())
|
||||
// TODO: use tokio_epoll_uring if configured as `io_engine`.
|
||||
// See https://github.com/neondatabase/neon/issues/6663
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let slice_storage;
|
||||
let content_len = content.bytes_init();
|
||||
let content = if content.bytes_init() > 0 {
|
||||
slice_storage = Some(content.slice(0..content_len));
|
||||
slice_storage.as_deref().expect("just set it to Some()")
|
||||
} else {
|
||||
&[]
|
||||
};
|
||||
utils::crashsafe::overwrite(&final_path, &tmp_path, content)
|
||||
})
|
||||
.await
|
||||
.expect("blocking task is never aborted")
|
||||
}
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
@@ -1337,7 +1323,7 @@ mod tests {
|
||||
let path = testdir.join("myfile");
|
||||
let tmp_path = testdir.join("myfile.tmp");
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
|
||||
@@ -1346,7 +1332,7 @@ mod tests {
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
|
||||
@@ -1368,7 +1354,7 @@ mod tests {
|
||||
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
|
||||
assert!(tmp_path.exists());
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -7,24 +7,6 @@ AS 'MODULE_PATHNAME', 'test_consume_xids'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION test_consume_cpu(seconds int)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'test_consume_cpu'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION test_consume_memory(megabytes int)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'test_consume_memory'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION test_release_memory(megabytes int DEFAULT NULL)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'test_release_memory'
|
||||
LANGUAGE C
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION clear_buffer_cache()
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'clear_buffer_cache'
|
||||
|
||||
@@ -3,4 +3,3 @@ comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.0'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -21,12 +21,10 @@
|
||||
#include "miscadmin.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/varlena.h"
|
||||
#include "utils/wait_event.h"
|
||||
#include "../neon/pagestore_client.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
@@ -34,9 +32,6 @@ PG_MODULE_MAGIC;
|
||||
extern void _PG_init(void);
|
||||
|
||||
PG_FUNCTION_INFO_V1(test_consume_xids);
|
||||
PG_FUNCTION_INFO_V1(test_consume_cpu);
|
||||
PG_FUNCTION_INFO_V1(test_consume_memory);
|
||||
PG_FUNCTION_INFO_V1(test_release_memory);
|
||||
PG_FUNCTION_INFO_V1(clear_buffer_cache);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||
@@ -102,119 +97,6 @@ test_consume_xids(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* test_consume_cpu(seconds int). Keeps one CPU busy for the given number of seconds.
|
||||
*/
|
||||
Datum
|
||||
test_consume_cpu(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 seconds = PG_GETARG_INT32(0);
|
||||
TimestampTz start;
|
||||
uint64 total_iterations = 0;
|
||||
|
||||
start = GetCurrentTimestamp();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
TimestampTz elapsed;
|
||||
|
||||
elapsed = GetCurrentTimestamp() - start;
|
||||
if (elapsed > (TimestampTz) seconds * USECS_PER_SEC)
|
||||
break;
|
||||
|
||||
/* keep spinning */
|
||||
for (int i = 0; i < 1000000; i++)
|
||||
total_iterations++;
|
||||
elog(DEBUG2, "test_consume_cpu(): %lu iterations in total", total_iterations);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
static MemoryContext consume_cxt = NULL;
|
||||
static slist_head consumed_memory_chunks;
|
||||
static int64 num_memory_chunks;
|
||||
|
||||
/*
|
||||
* test_consume_memory(megabytes int).
|
||||
*
|
||||
* Consume given amount of memory. The allocation is made in TopMemoryContext,
|
||||
* so it outlives the function, until you call test_release_memory to
|
||||
* explicitly release it, or close the session.
|
||||
*/
|
||||
Datum
|
||||
test_consume_memory(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 megabytes = PG_GETARG_INT32(0);
|
||||
|
||||
/*
|
||||
* Consume the memory in a new memory context, so that it's convenient to
|
||||
* release and to display it separately in a possible memory context dump.
|
||||
*/
|
||||
if (consume_cxt == NULL)
|
||||
consume_cxt = AllocSetContextCreate(TopMemoryContext,
|
||||
"test_consume_memory",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
for (int32 i = 0; i < megabytes; i++)
|
||||
{
|
||||
char *p;
|
||||
|
||||
p = MemoryContextAllocZero(consume_cxt, 1024 * 1024);
|
||||
|
||||
/* touch the memory, so that it's really allocated by the kernel */
|
||||
for (int j = 0; j < 1024 * 1024; j += 1024)
|
||||
p[j] = j % 0xFF;
|
||||
|
||||
slist_push_head(&consumed_memory_chunks, (slist_node *) p);
|
||||
num_memory_chunks++;
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* test_release_memory(megabytes int). NULL releases all
|
||||
*/
|
||||
Datum
|
||||
test_release_memory(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TimestampTz start;
|
||||
|
||||
if (PG_ARGISNULL(0))
|
||||
{
|
||||
if (consume_cxt)
|
||||
{
|
||||
MemoryContextDelete(consume_cxt);
|
||||
consume_cxt = NULL;
|
||||
num_memory_chunks = 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int32 chunks_to_release = PG_GETARG_INT32(0);
|
||||
|
||||
if (chunks_to_release > num_memory_chunks)
|
||||
{
|
||||
elog(WARNING, "only %lu MB is consumed, releasing it all", num_memory_chunks);
|
||||
chunks_to_release = num_memory_chunks;
|
||||
}
|
||||
|
||||
for (int32 i = 0; i < chunks_to_release; i++)
|
||||
{
|
||||
slist_node *chunk = slist_pop_head_node(&consumed_memory_chunks);
|
||||
|
||||
pfree(chunk);
|
||||
num_memory_chunks--;
|
||||
}
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* Flush the buffer cache, evicting all pages that are not currently pinned.
|
||||
*/
|
||||
|
||||
@@ -122,24 +122,25 @@ where
|
||||
|
||||
error!(error = ?err, "could not connect to compute node");
|
||||
|
||||
let node_info = if !node_info.cached() {
|
||||
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
|
||||
// Do not need to retrieve a new node_info, just return the old one.
|
||||
if !err.should_retry(num_retries) {
|
||||
return Err(err.into());
|
||||
}
|
||||
node_info
|
||||
} else {
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
ctx.latency_timer.cache_miss();
|
||||
let old_node_info = invalidate_cache(node_info);
|
||||
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
|
||||
node_info.reuse_settings(old_node_info);
|
||||
let node_info =
|
||||
if err.get_error_kind() == crate::error::ErrorKind::Postgres || !node_info.cached() {
|
||||
// If the error is Postgres, that means that we managed to connect to the compute node, but there was an error.
|
||||
// Do not need to retrieve a new node_info, just return the old one.
|
||||
if !err.should_retry(num_retries) {
|
||||
return Err(err.into());
|
||||
}
|
||||
node_info
|
||||
} else {
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
ctx.latency_timer.cache_miss();
|
||||
let old_node_info = invalidate_cache(node_info);
|
||||
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
|
||||
node_info.reuse_settings(old_node_info);
|
||||
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
node_info
|
||||
};
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
node_info
|
||||
};
|
||||
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
|
||||
@@ -375,6 +375,8 @@ enum ConnectAction {
|
||||
Connect,
|
||||
Retry,
|
||||
Fail,
|
||||
RetryPg,
|
||||
FailPg,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -464,6 +466,14 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
retryable: false,
|
||||
kind: ErrorKind::Compute,
|
||||
}),
|
||||
ConnectAction::FailPg => Err(TestConnectError {
|
||||
retryable: false,
|
||||
kind: ErrorKind::Postgres,
|
||||
}),
|
||||
ConnectAction::RetryPg => Err(TestConnectError {
|
||||
retryable: true,
|
||||
kind: ErrorKind::Postgres,
|
||||
}),
|
||||
x => panic!("expecting action {:?}, connect is called instead", x),
|
||||
}
|
||||
}
|
||||
@@ -562,6 +572,32 @@ async fn connect_to_compute_retry() {
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_retry_pg() {
|
||||
let _ = env_logger::try_init();
|
||||
use ConnectAction::*;
|
||||
let mut ctx = RequestMonitoring::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, RetryPg, Connect]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_fail_pg() {
|
||||
let _ = env_logger::try_init();
|
||||
use ConnectAction::*;
|
||||
let mut ctx = RequestMonitoring::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, FailPg]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Test that we don't retry if the error is not retryable.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_1() {
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use hex::FromHex;
|
||||
use pageserver::tenant::Tenant;
|
||||
use reqwest::{header, Client, StatusCode, Url};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -286,7 +290,7 @@ impl CloudAdminApiClient {
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
continue;
|
||||
}
|
||||
_status => {
|
||||
status => {
|
||||
return Err(Error::new(
|
||||
"List active projects".to_string(),
|
||||
ErrorKind::ResponseStatus(response.status()),
|
||||
|
||||
@@ -116,9 +116,6 @@ pub struct SharedState {
|
||||
/// when tli is inactive instead of having this flag.
|
||||
active: bool,
|
||||
last_removed_segno: XLogSegNo,
|
||||
/// True if local and remote deletion has been done, allows to skip visiting
|
||||
/// s3 on retries.
|
||||
fully_deleted: bool,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -158,7 +155,6 @@ impl SharedState {
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
last_removed_segno: 0,
|
||||
fully_deleted: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -178,7 +174,6 @@ impl SharedState {
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
last_removed_segno: 0,
|
||||
fully_deleted: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -487,10 +482,6 @@ impl Timeline {
|
||||
shared_state: &mut MutexGuard<'_, SharedState>,
|
||||
only_local: bool,
|
||||
) -> Result<(bool, bool)> {
|
||||
if shared_state.fully_deleted {
|
||||
return Ok((false, false));
|
||||
};
|
||||
|
||||
let was_active = shared_state.active;
|
||||
self.cancel(shared_state);
|
||||
|
||||
@@ -505,9 +496,6 @@ impl Timeline {
|
||||
wal_backup::delete_timeline(&self.ttid).await?;
|
||||
}
|
||||
let dir_existed = delete_dir(&self.timeline_dir).await?;
|
||||
if !only_local {
|
||||
shared_state.fully_deleted = true;
|
||||
}
|
||||
Ok((dir_existed, was_active))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
-- Test the test utils in pgxn/neon_test_utils. We don't test that
|
||||
-- these actually consume resources like they should - that would be
|
||||
-- tricky - but at least we check that they don't crash.
|
||||
CREATE EXTENSION neon_test_utils;
|
||||
select test_consume_cpu(1);
|
||||
test_consume_cpu
|
||||
------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_consume_memory(20); -- Allocate 20 MB
|
||||
test_consume_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_release_memory(5); -- Release 5 MB
|
||||
test_release_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_release_memory(); -- Release the remaining 15 MB
|
||||
test_release_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
@@ -7,5 +7,4 @@
|
||||
test: neon-cid
|
||||
test: neon-rel-truncate
|
||||
test: neon-clog
|
||||
test: neon-test-utils
|
||||
test: neon-vacuum-full
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
-- Test the test utils in pgxn/neon_test_utils. We don't test that
|
||||
-- these actually consume resources like they should - that would be
|
||||
-- tricky - but at least we check that they don't crash.
|
||||
|
||||
CREATE EXTENSION neon_test_utils;
|
||||
|
||||
select test_consume_cpu(1);
|
||||
|
||||
select test_consume_memory(20); -- Allocate 20 MB
|
||||
select test_release_memory(5); -- Release 5 MB
|
||||
select test_release_memory(); -- Release the remaining 15 MB
|
||||
@@ -18,7 +18,7 @@ commands:
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml'
|
||||
shutdownHook: |
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m immediate --wait -t 10'
|
||||
files:
|
||||
- filename: pgbouncer.ini
|
||||
content: |
|
||||
|
||||
Reference in New Issue
Block a user