Compare commits

..

1 Commits

Author SHA1 Message Date
Arseny Sher
bce8d4b3f5 Stop postgres in immediate mode in VM image.
PR #6712 might add a delay in graceful shutdown without change to compute -> sk
protocol to force sync of control file on safekeeper to persist commit_lsn, so
use 'immediate' mode instead; 'fast' one isn't useful for neon anyway.
2024-02-14 19:50:04 +03:00
28 changed files with 172 additions and 293 deletions

View File

@@ -820,10 +820,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_test_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_rmgr \

View File

@@ -381,6 +381,7 @@ impl Persistence {
//
// We create the child shards here, so that they will be available for increment_generation calls
// if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
#[allow(dead_code)]
pub(crate) async fn begin_shard_split(
&self,
old_shard_count: ShardCount,
@@ -448,6 +449,7 @@ impl Persistence {
// When we finish shard splitting, we must atomically clean up the old shards
// and insert the new shards, and clear the splitting marker.
#[allow(dead_code)]
pub(crate) async fn complete_shard_split(
&self,
split_tenant_id: TenantId,

View File

@@ -115,6 +115,7 @@ pub fn set_build_info_metric(revision: &str, build_tag: &str) {
// performed by the process.
// We know the size of the block, so we can determine the I/O bytes out of it.
// The value might be not 100% exact, but should be fine for Prometheus metrics in this case.
#[allow(clippy::unnecessary_cast)]
fn update_rusage_metrics() {
let rusage_stats = get_rusage_stats();

View File

@@ -3,7 +3,7 @@
#![allow(non_snake_case)]
// bindgen creates some unsafe code with no doc comments.
#![allow(clippy::missing_safety_doc)]
// noted at 1.63 that in many cases there's u32 -> u32 transmutes in bindgen code.
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
#![allow(clippy::useless_transmute)]
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.

View File

@@ -435,6 +435,7 @@ impl RemoteStorage for LocalFs {
Ok(())
}
#[allow(clippy::diverging_sub_expression)]
async fn time_travel_recover(
&self,
_prefix: Option<&RemotePath>,

View File

@@ -1,3 +1,5 @@
#![allow(unused)]
use criterion::{criterion_group, criterion_main, Criterion};
use utils::id;

View File

@@ -1,7 +1,7 @@
use std::{
borrow::Cow,
fs::{self, File},
io,
io::{self, Write},
};
use camino::{Utf8Path, Utf8PathBuf};
@@ -161,6 +161,48 @@ pub async fn durable_rename(
Ok(())
}
/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
///
/// The file is first written to the specified `tmp_path`, and in a second
/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
/// and atomic rename guarantee that, if we crash at any point, there will never
/// be a partially written file at `final_path` (but maybe at `tmp_path`).
///
/// Callers are responsible for serializing calls of this function for a given `final_path`.
/// If they don't, there may be an error due to conflicting `tmp_path`, or there will
/// be no error and the content of `final_path` will be the "winner" caller's `content`.
/// I.e., the atomticity guarantees still hold.
pub fn overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true)
.open(tmp_path)?;
file.write_all(content)?;
file.sync_all()?;
drop(file); // don't keep the fd open for longer than we have to
std::fs::rename(tmp_path, final_path)?;
let final_parent_dirfd = std::fs::OpenOptions::new()
.read(true)
.open(final_path_parent)?;
final_parent_dirfd.sync_all()?;
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -234,7 +234,7 @@ impl DeletionHeader {
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
let header_path = conf.deletion_header_path();
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
.await
.maybe_fatal_err("save deletion header")?;
@@ -325,7 +325,8 @@ impl DeletionList {
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)
@@ -834,6 +835,7 @@ mod test {
}
impl ControlPlaneGenerationsApi for MockControlPlane {
#[allow(clippy::diverging_sub_expression)] // False positive via async_trait
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
unimplemented!()
}

View File

@@ -351,6 +351,7 @@ pub enum IterationOutcome<U> {
Finished(IterationOutcomeFinished<U>),
}
#[allow(dead_code)]
#[derive(Debug, Serialize)]
pub struct IterationOutcomeFinished<U> {
/// The actual usage observed before we started the iteration.
@@ -365,6 +366,7 @@ pub struct IterationOutcomeFinished<U> {
}
#[derive(Debug, Serialize)]
#[allow(dead_code)]
struct AssumedUsage<U> {
/// The expected value for `after`, after phase 2.
projected_after: U,
@@ -372,12 +374,14 @@ struct AssumedUsage<U> {
failed: LayerCount,
}
#[allow(dead_code)]
#[derive(Debug, Serialize)]
struct PlannedUsage<U> {
respecting_tenant_min_resident_size: U,
fallback_to_global_lru: Option<U>,
}
#[allow(dead_code)]
#[derive(Debug, Default, Serialize)]
struct LayerCount {
file_sizes: u64,
@@ -561,6 +565,7 @@ pub(crate) struct EvictionSecondaryLayer {
#[derive(Clone)]
pub(crate) enum EvictionLayer {
Attached(Layer),
#[allow(dead_code)]
Secondary(EvictionSecondaryLayer),
}
@@ -1100,6 +1105,7 @@ mod filesystem_level_usage {
use super::DiskUsageEvictionTaskConfig;
#[derive(Debug, Clone, Copy)]
#[allow(dead_code)]
pub struct Usage<'a> {
config: &'a DiskUsageEvictionTaskConfig,

View File

@@ -2214,7 +2214,7 @@ pub fn make_router(
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|r| api_handler(r, timeline_collect_keyspace),
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.any(handler_404))

View File

@@ -30,6 +30,10 @@
//! only a single tenant or timeline.
//!
// Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
// Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
#![allow(clippy::declare_interior_mutable_const)]
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
@@ -308,6 +312,7 @@ struct MutableTaskState {
}
struct PageServerTask {
#[allow(dead_code)] // unused currently
task_id: PageserverTaskId,
kind: TaskKind,

View File

@@ -28,7 +28,6 @@ use remote_storage::GenericRemoteStorage;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::runtime::Handle;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -2878,17 +2877,10 @@ impl Tenant {
let tenant_shard_id = *tenant_shard_id;
let config_path = config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {config_path}")
})
})
})
.await??;
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
Ok(())
}
@@ -2915,17 +2907,12 @@ impl Tenant {
let tenant_shard_id = *tenant_shard_id;
let target_config_path = target_config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})
})
})
.await??;
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})?;
Ok(())
}
@@ -4373,6 +4360,7 @@ mod tests {
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
let writer = tline.writer().await;
// Create a relation on the timeline

View File

@@ -36,6 +36,7 @@ use crate::{
pub const VALUE_SZ: usize = 5;
pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
#[allow(dead_code)]
pub const PAGE_SZ: usize = 8192;
#[derive(Clone, Copy, Debug)]

View File

@@ -279,7 +279,7 @@ pub async fn save_metadata(
let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes)
.await
.context("write metadata")?;
Ok(())

View File

@@ -484,14 +484,9 @@ impl<'a> TenantDownloader<'a> {
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
let heatmap_path_bg = heatmap_path.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
})
})
.await
.expect("Blocking task is never aborted")
.maybe_fatal_err(&context_msg)?;
VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
.await
.maybe_fatal_err(&context_msg)?;
tracing::debug!("Wrote local heatmap to {}", heatmap_path);

View File

@@ -196,13 +196,13 @@ impl Timeline {
ControlFlow::Continue(()) => (),
}
#[allow(dead_code)]
#[derive(Debug, Default)]
struct EvictionStats {
candidates: usize,
evicted: usize,
errors: usize,
not_evictable: usize,
#[allow(dead_code)]
skipped_for_shutdown: usize,
}

View File

@@ -19,14 +19,13 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
@@ -404,47 +403,34 @@ impl VirtualFile {
Ok(vfile)
}
/// Writes a file to the specified `final_path` in a crash safe fasion
/// Async version of [`::utils::crashsafe::overwrite`].
///
/// The file is first written to the specified tmp_path, and in a second
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite<B: BoundedBuf>(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
/// # NB:
///
/// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
/// it did at an earlier time.
/// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
content: B,
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true),
)
.await?;
let (_content, res) = file.write_all(content).await;
res?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
final_parent_dirfd.sync_all().await?;
Ok(())
// TODO: use tokio_epoll_uring if configured as `io_engine`.
// See https://github.com/neondatabase/neon/issues/6663
tokio::task::spawn_blocking(move || {
let slice_storage;
let content_len = content.bytes_init();
let content = if content.bytes_init() > 0 {
slice_storage = Some(content.slice(0..content_len));
slice_storage.as_deref().expect("just set it to Some()")
} else {
&[]
};
utils::crashsafe::overwrite(&final_path, &tmp_path, content)
})
.await
.expect("blocking task is never aborted")
}
/// Call File::sync_all() on the underlying File.
@@ -1337,7 +1323,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1346,7 +1332,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1368,7 +1354,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();

View File

@@ -7,24 +7,6 @@ AS 'MODULE_PATHNAME', 'test_consume_xids'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION test_consume_cpu(seconds int)
RETURNS VOID
AS 'MODULE_PATHNAME', 'test_consume_cpu'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION test_consume_memory(megabytes int)
RETURNS VOID
AS 'MODULE_PATHNAME', 'test_consume_memory'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION test_release_memory(megabytes int DEFAULT NULL)
RETURNS VOID
AS 'MODULE_PATHNAME', 'test_release_memory'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION clear_buffer_cache()
RETURNS VOID
AS 'MODULE_PATHNAME', 'clear_buffer_cache'

View File

@@ -3,4 +3,3 @@ comment = 'helpers for neon testing and debugging'
default_version = '1.0'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -21,12 +21,10 @@
#include "miscadmin.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "utils/wait_event.h"
#include "../neon/pagestore_client.h"
PG_MODULE_MAGIC;
@@ -34,9 +32,6 @@ PG_MODULE_MAGIC;
extern void _PG_init(void);
PG_FUNCTION_INFO_V1(test_consume_xids);
PG_FUNCTION_INFO_V1(test_consume_cpu);
PG_FUNCTION_INFO_V1(test_consume_memory);
PG_FUNCTION_INFO_V1(test_release_memory);
PG_FUNCTION_INFO_V1(clear_buffer_cache);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
@@ -102,119 +97,6 @@ test_consume_xids(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* test_consume_cpu(seconds int). Keeps one CPU busy for the given number of seconds.
*/
Datum
test_consume_cpu(PG_FUNCTION_ARGS)
{
int32 seconds = PG_GETARG_INT32(0);
TimestampTz start;
uint64 total_iterations = 0;
start = GetCurrentTimestamp();
for (;;)
{
TimestampTz elapsed;
elapsed = GetCurrentTimestamp() - start;
if (elapsed > (TimestampTz) seconds * USECS_PER_SEC)
break;
/* keep spinning */
for (int i = 0; i < 1000000; i++)
total_iterations++;
elog(DEBUG2, "test_consume_cpu(): %lu iterations in total", total_iterations);
CHECK_FOR_INTERRUPTS();
}
PG_RETURN_VOID();
}
static MemoryContext consume_cxt = NULL;
static slist_head consumed_memory_chunks;
static int64 num_memory_chunks;
/*
* test_consume_memory(megabytes int).
*
* Consume given amount of memory. The allocation is made in TopMemoryContext,
* so it outlives the function, until you call test_release_memory to
* explicitly release it, or close the session.
*/
Datum
test_consume_memory(PG_FUNCTION_ARGS)
{
int32 megabytes = PG_GETARG_INT32(0);
/*
* Consume the memory in a new memory context, so that it's convenient to
* release and to display it separately in a possible memory context dump.
*/
if (consume_cxt == NULL)
consume_cxt = AllocSetContextCreate(TopMemoryContext,
"test_consume_memory",
ALLOCSET_DEFAULT_SIZES);
for (int32 i = 0; i < megabytes; i++)
{
char *p;
p = MemoryContextAllocZero(consume_cxt, 1024 * 1024);
/* touch the memory, so that it's really allocated by the kernel */
for (int j = 0; j < 1024 * 1024; j += 1024)
p[j] = j % 0xFF;
slist_push_head(&consumed_memory_chunks, (slist_node *) p);
num_memory_chunks++;
}
PG_RETURN_VOID();
}
/*
* test_release_memory(megabytes int). NULL releases all
*/
Datum
test_release_memory(PG_FUNCTION_ARGS)
{
TimestampTz start;
if (PG_ARGISNULL(0))
{
if (consume_cxt)
{
MemoryContextDelete(consume_cxt);
consume_cxt = NULL;
num_memory_chunks = 0;
}
}
else
{
int32 chunks_to_release = PG_GETARG_INT32(0);
if (chunks_to_release > num_memory_chunks)
{
elog(WARNING, "only %lu MB is consumed, releasing it all", num_memory_chunks);
chunks_to_release = num_memory_chunks;
}
for (int32 i = 0; i < chunks_to_release; i++)
{
slist_node *chunk = slist_pop_head_node(&consumed_memory_chunks);
pfree(chunk);
num_memory_chunks--;
}
}
PG_RETURN_VOID();
}
/*
* Flush the buffer cache, evicting all pages that are not currently pinned.
*/

View File

@@ -122,24 +122,25 @@ where
error!(error = ?err, "could not connect to compute node");
let node_info = if !node_info.cached() {
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !err.should_retry(num_retries) {
return Err(err.into());
}
node_info
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
ctx.latency_timer.cache_miss();
let old_node_info = invalidate_cache(node_info);
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
node_info.reuse_settings(old_node_info);
let node_info =
if err.get_error_kind() == crate::error::ErrorKind::Postgres || !node_info.cached() {
// If the error is Postgres, that means that we managed to connect to the compute node, but there was an error.
// Do not need to retrieve a new node_info, just return the old one.
if !err.should_retry(num_retries) {
return Err(err.into());
}
node_info
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
ctx.latency_timer.cache_miss();
let old_node_info = invalidate_cache(node_info);
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
node_info.reuse_settings(old_node_info);
mechanism.update_connect_config(&mut node_info.config);
node_info
};
mechanism.update_connect_config(&mut node_info.config);
node_info
};
// now that we have a new node, try connect to it repeatedly.
// this can error for a few reasons, for instance:

View File

@@ -375,6 +375,8 @@ enum ConnectAction {
Connect,
Retry,
Fail,
RetryPg,
FailPg,
}
#[derive(Clone)]
@@ -464,6 +466,14 @@ impl ConnectMechanism for TestConnectMechanism {
retryable: false,
kind: ErrorKind::Compute,
}),
ConnectAction::FailPg => Err(TestConnectError {
retryable: false,
kind: ErrorKind::Postgres,
}),
ConnectAction::RetryPg => Err(TestConnectError {
retryable: true,
kind: ErrorKind::Postgres,
}),
x => panic!("expecting action {:?}, connect is called instead", x),
}
}
@@ -562,6 +572,32 @@ async fn connect_to_compute_retry() {
mechanism.verify();
}
#[tokio::test]
async fn connect_to_compute_retry_pg() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, RetryPg, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap();
mechanism.verify();
}
#[tokio::test]
async fn connect_to_compute_fail_pg() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, FailPg]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap_err();
mechanism.verify();
}
/// Test that we don't retry if the error is not retryable.
#[tokio::test]
async fn connect_to_compute_non_retry_1() {

View File

@@ -1,7 +1,11 @@
#![allow(unused)]
use std::str::FromStr;
use std::time::Duration;
use chrono::{DateTime, Utc};
use hex::FromHex;
use pageserver::tenant::Tenant;
use reqwest::{header, Client, StatusCode, Url};
use serde::Deserialize;
use tokio::sync::Semaphore;
@@ -286,7 +290,7 @@ impl CloudAdminApiClient {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
_status => {
status => {
return Err(Error::new(
"List active projects".to_string(),
ErrorKind::ResponseStatus(response.status()),

View File

@@ -116,9 +116,6 @@ pub struct SharedState {
/// when tli is inactive instead of having this flag.
active: bool,
last_removed_segno: XLogSegNo,
/// True if local and remote deletion has been done, allows to skip visiting
/// s3 on retries.
fully_deleted: bool,
}
impl SharedState {
@@ -158,7 +155,6 @@ impl SharedState {
wal_backup_active: false,
active: false,
last_removed_segno: 0,
fully_deleted: false,
})
}
@@ -178,7 +174,6 @@ impl SharedState {
wal_backup_active: false,
active: false,
last_removed_segno: 0,
fully_deleted: false,
})
}
@@ -487,10 +482,6 @@ impl Timeline {
shared_state: &mut MutexGuard<'_, SharedState>,
only_local: bool,
) -> Result<(bool, bool)> {
if shared_state.fully_deleted {
return Ok((false, false));
};
let was_active = shared_state.active;
self.cancel(shared_state);
@@ -505,9 +496,6 @@ impl Timeline {
wal_backup::delete_timeline(&self.ttid).await?;
}
let dir_existed = delete_dir(&self.timeline_dir).await?;
if !only_local {
shared_state.fully_deleted = true;
}
Ok((dir_existed, was_active))
}

View File

@@ -1,28 +0,0 @@
-- Test the test utils in pgxn/neon_test_utils. We don't test that
-- these actually consume resources like they should - that would be
-- tricky - but at least we check that they don't crash.
CREATE EXTENSION neon_test_utils;
select test_consume_cpu(1);
test_consume_cpu
------------------
(1 row)
select test_consume_memory(20); -- Allocate 20 MB
test_consume_memory
---------------------
(1 row)
select test_release_memory(5); -- Release 5 MB
test_release_memory
---------------------
(1 row)
select test_release_memory(); -- Release the remaining 15 MB
test_release_memory
---------------------
(1 row)

View File

@@ -7,5 +7,4 @@
test: neon-cid
test: neon-rel-truncate
test: neon-clog
test: neon-test-utils
test: neon-vacuum-full

View File

@@ -1,11 +0,0 @@
-- Test the test utils in pgxn/neon_test_utils. We don't test that
-- these actually consume resources like they should - that would be
-- tricky - but at least we check that they don't crash.
CREATE EXTENSION neon_test_utils;
select test_consume_cpu(1);
select test_consume_memory(20); -- Allocate 20 MB
select test_release_memory(5); -- Release 5 MB
select test_release_memory(); -- Release the remaining 15 MB

View File

@@ -18,7 +18,7 @@ commands:
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter.yml'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m immediate --wait -t 10'
files:
- filename: pgbouncer.ini
content: |