mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-09 05:30:37 +00:00
Compare commits
35 Commits
gc_feedbac
...
delete_tim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69e92372c8 | ||
|
|
c8b35324f6 | ||
|
|
bc4f6db0ad | ||
|
|
d0b2befe02 | ||
|
|
0251c7c15c | ||
|
|
9ab4e5ae54 | ||
|
|
70c2717239 | ||
|
|
f0bcfd61e3 | ||
|
|
cbe3e11923 | ||
|
|
7b6ed51319 | ||
|
|
a9720349bb | ||
|
|
83457a848b | ||
|
|
b4cf3b18fa | ||
|
|
6b1adf6e5e | ||
|
|
4591698cd4 | ||
|
|
81e8c10069 | ||
|
|
6bd3951141 | ||
|
|
511978be6f | ||
|
|
1e94bbf249 | ||
|
|
a0f63b8264 | ||
|
|
c948ee3975 | ||
|
|
3784166180 | ||
|
|
8c1962d40c | ||
|
|
d3ce8eae4e | ||
|
|
3b1141b344 | ||
|
|
17c92c885c | ||
|
|
43b397da10 | ||
|
|
00673cf900 | ||
|
|
ba7c97b61c | ||
|
|
a0b34e8c49 | ||
|
|
fdc1c12fb0 | ||
|
|
0322e2720f | ||
|
|
4f64be4a98 | ||
|
|
e7514cc15e | ||
|
|
6415dc791c |
4
.github/pull_request_template.md
vendored
4
.github/pull_request_template.md
vendored
@@ -1,6 +1,6 @@
|
||||
## Describe your changes
|
||||
## Problem
|
||||
|
||||
## Issue ticket number and link
|
||||
## Summary of changes
|
||||
|
||||
## Checklist before requesting a review
|
||||
|
||||
|
||||
22
.github/workflows/benchmarking.yml
vendored
22
.github/workflows/benchmarking.yml
vendored
@@ -16,12 +16,12 @@ on:
|
||||
workflow_dispatch: # adds ability to run this manually
|
||||
inputs:
|
||||
region_id:
|
||||
description: 'Use a particular region. If not set the default region will be used'
|
||||
description: 'Project region id. If not set, the default region will be used'
|
||||
required: false
|
||||
default: 'aws-us-east-2'
|
||||
save_perf_report:
|
||||
type: boolean
|
||||
description: 'Publish perf report or not. If not set, the report is published only for the main branch'
|
||||
description: 'Publish perf report. If not set, the report will be published only for the main branch'
|
||||
required: false
|
||||
|
||||
defaults:
|
||||
@@ -125,13 +125,14 @@ jobs:
|
||||
matrix='{
|
||||
"platform": [
|
||||
"neon-captest-new",
|
||||
"neon-captest-reuse"
|
||||
"neon-captest-reuse",
|
||||
"neonvm-captest-new"
|
||||
],
|
||||
"db_size": [ "10gb" ],
|
||||
"include": [
|
||||
{ "platform": "neon-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neon-captest-new", "db_size": "50gb" }
|
||||
]
|
||||
"include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neon-captest-new", "db_size": "50gb" },
|
||||
{ "platform": "neonvm-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neonvm-captest-new", "db_size": "50gb" }]
|
||||
}'
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ]; then
|
||||
@@ -197,7 +198,7 @@ jobs:
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Create Neon Project
|
||||
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier"]'), matrix.platform)
|
||||
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier", "neonvm-captest-new", "neonvm-captest-freetier"]'), matrix.platform)
|
||||
id: create-neon-project
|
||||
uses: ./.github/actions/neon-project-create
|
||||
with:
|
||||
@@ -205,6 +206,7 @@ jobs:
|
||||
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
compute_units: ${{ (matrix.platform == 'neon-captest-freetier' && '[0.25, 0.25]') || '[1, 1]' }}
|
||||
provisioner: ${{ (contains(matrix.platform, 'neonvm-') && 'k8s-neonvm') || 'k8s-pod' }}
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
@@ -213,7 +215,7 @@ jobs:
|
||||
neon-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
|
||||
;;
|
||||
neon-captest-new | neon-captest-freetier)
|
||||
neon-captest-new | neon-captest-freetier | neonvm-captest-new | neonvm-captest-freetier)
|
||||
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
|
||||
;;
|
||||
rds-aurora)
|
||||
@@ -223,7 +225,7 @@ jobs:
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'"
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
@@ -60,6 +60,10 @@ pub mod tracing_span_assert;
|
||||
|
||||
pub mod rate_limit;
|
||||
|
||||
/// Primitive for coalescing operations into a single task which will not be cancelled by for
|
||||
/// example external http client closing the connection.
|
||||
pub mod shared_retryable;
|
||||
|
||||
mod failpoint_macro_helpers {
|
||||
|
||||
/// use with fail::cfg("$name", "return(2000)")
|
||||
@@ -96,6 +100,7 @@ mod failpoint_macro_helpers {
|
||||
tracing::info!("failpoint {:?}: sleep done", name);
|
||||
}
|
||||
}
|
||||
|
||||
pub use failpoint_macro_helpers::failpoint_sleep_helper;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
|
||||
657
libs/utils/src/shared_retryable.rs
Normal file
657
libs/utils/src/shared_retryable.rs
Normal file
@@ -0,0 +1,657 @@
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Container using which many request handlers can come together and join a single task to
|
||||
/// completion instead of racing each other and their own cancellation.
|
||||
///
|
||||
/// In a picture:
|
||||
///
|
||||
/// ```text
|
||||
/// SharedRetryable::try_restart Spawned task completes with only one concurrent attempt
|
||||
/// \ /
|
||||
/// request handler 1 ---->|--X
|
||||
/// request handler 2 ---->|-------|
|
||||
/// request handler 3 ---->|-------|
|
||||
/// | |
|
||||
/// v |
|
||||
/// one spawned task \------>/
|
||||
///
|
||||
/// (X = cancelled during await)
|
||||
/// ```
|
||||
///
|
||||
/// Implementation is cancel safe. Implementation and internal structure are hurt by the inability
|
||||
/// to just spawn the task, but this is needed for `pageserver` usage. Within `pageserver`, the
|
||||
/// `task_mgr` must be used to spawn the future because it will cause awaiting during shutdown.
|
||||
///
|
||||
/// Implementation exposes a fully decomposed [`SharedRetryable::try_restart`] which requires the
|
||||
/// caller to do the spawning before awaiting for the result. If the caller is dropped while this
|
||||
/// happens, a new attempt will be required, and all concurrent awaiters will see a
|
||||
/// [`RetriedTaskPanicked`] error.
|
||||
///
|
||||
/// There is another "family of APIs" [`SharedRetryable::attempt_spawn`] for infallible futures. It is
|
||||
/// just provided for completeness, and it does not have a fully decomposed version like
|
||||
/// `try_restart`.
|
||||
///
|
||||
/// For `try_restart_*` family of APIs, there is a concept of two leveled results. The inner level
|
||||
/// is returned by the executed future. It needs to be `Clone`. Most errors are not `Clone`, so
|
||||
/// implementation advice is to log the happened error, and not propagate more than a label as the
|
||||
/// "inner error" which will be used to build an outer error. The outer error will also have to be
|
||||
/// convertable from [`RetriedTaskPanicked`] to absorb that case as well.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// A shared service value completes the infallible work once, even if called concurrently by
|
||||
/// multiple cancellable tasks.
|
||||
///
|
||||
/// Example moved as a test `service_example`.
|
||||
#[derive(Clone)]
|
||||
pub struct SharedRetryable<V> {
|
||||
inner: Arc<tokio::sync::Mutex<MaybeDone<V>>>,
|
||||
}
|
||||
|
||||
impl<V> Default for SharedRetryable<V> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(tokio::sync::Mutex::new(MaybeDone::default())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine if an error is transient or permanent.
|
||||
pub trait Retryable {
|
||||
fn is_permanent(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
pub trait MakeFuture {
|
||||
type Future: Future<Output = Self::Output> + Send + 'static;
|
||||
type Output: Send + 'static;
|
||||
|
||||
fn make_future(self) -> Self::Future;
|
||||
}
|
||||
|
||||
impl<Fun, Fut, R> MakeFuture for Fun
|
||||
where
|
||||
Fun: FnOnce() -> Fut,
|
||||
Fut: Future<Output = R> + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
type Future = Fut;
|
||||
type Output = R;
|
||||
|
||||
fn make_future(self) -> Self::Future {
|
||||
self()
|
||||
}
|
||||
}
|
||||
|
||||
/// Retried task panicked, was cancelled, or never spawned (see [`SharedRetryable::try_restart`]).
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct RetriedTaskPanicked;
|
||||
|
||||
impl<T, E1> SharedRetryable<Result<T, E1>>
|
||||
where
|
||||
T: Clone + std::fmt::Debug + Send + 'static,
|
||||
E1: Retryable + Clone + std::fmt::Debug + Send + 'static,
|
||||
{
|
||||
/// Restart a previously failed operation unless it already completed with a terminal result.
|
||||
///
|
||||
/// Many futures can call this function and and get the terminal result from an earlier attempt
|
||||
/// or start a new attempt, or join an existing one.
|
||||
///
|
||||
/// Compared to `Self::try_restart`, this method also spawns the future to run, which would
|
||||
/// otherwise have to be done manually.
|
||||
#[cfg(test)]
|
||||
pub async fn try_restart_spawn<E2>(
|
||||
&self,
|
||||
retry_with: impl MakeFuture<Output = Result<T, E1>>,
|
||||
) -> Result<T, E2>
|
||||
where
|
||||
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
|
||||
{
|
||||
let (recv, maybe_fut) = self.try_restart(retry_with).await;
|
||||
|
||||
if let Some(fut) = maybe_fut {
|
||||
// top level function, we must spawn, pageserver cannot use this
|
||||
tokio::spawn(fut);
|
||||
}
|
||||
|
||||
recv.await
|
||||
}
|
||||
|
||||
/// Restart a previously failed operation unless it already completed with a terminal result.
|
||||
///
|
||||
/// Many futures can call this function and get the terminal result from an earlier attempt or
|
||||
/// start a new attempt, or join an existing one.
|
||||
///
|
||||
/// If a task calling this method is cancelled before spawning the returned future, this
|
||||
/// attempt is immediatedly deemed as having panicked will happen, but without a panic ever
|
||||
/// happening.
|
||||
///
|
||||
/// Returns one future for waiting for the result and possibly another which needs to be
|
||||
/// spawned when `Some`. Spawning has to happen before waiting is started, otherwise the first
|
||||
/// future will never make progress.
|
||||
///
|
||||
/// This complication exists because on `pageserver` we cannot use `tokio::spawn` directly
|
||||
/// at this time.
|
||||
pub async fn try_restart<E2>(
|
||||
&self,
|
||||
retry_with: impl MakeFuture<Output = Result<T, E1>>,
|
||||
) -> (
|
||||
impl Future<Output = Result<T, E2>> + Send + 'static,
|
||||
Option<impl Future<Output = ()> + Send + 'static>,
|
||||
)
|
||||
where
|
||||
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
|
||||
{
|
||||
use futures::future::Either;
|
||||
|
||||
match self.decide_to_retry_or_join(retry_with).await {
|
||||
Ok(terminal) => (Either::Left(async move { terminal }), None),
|
||||
Err((rx, maybe_fut)) => {
|
||||
let recv = Self::make_oneshot_alike_receiver(rx);
|
||||
|
||||
(Either::Right(recv), maybe_fut)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a Ok if the previous attempt had resulted in a terminal result. Err is returned
|
||||
/// when an attempt can be joined and possibly needs to be spawned.
|
||||
async fn decide_to_retry_or_join<E2>(
|
||||
&self,
|
||||
retry_with: impl MakeFuture<Output = Result<T, E1>>,
|
||||
) -> Result<
|
||||
Result<T, E2>,
|
||||
(
|
||||
tokio::sync::broadcast::Receiver<Result<T, E1>>,
|
||||
Option<impl Future<Output = ()> + Send + 'static>,
|
||||
),
|
||||
>
|
||||
where
|
||||
E2: From<E1> + From<RetriedTaskPanicked>,
|
||||
{
|
||||
let mut g = self.inner.lock().await;
|
||||
|
||||
let maybe_rx = match &*g {
|
||||
MaybeDone::Done(Ok(t)) => return Ok(Ok(t.to_owned())),
|
||||
MaybeDone::Done(Err(e)) if e.is_permanent() => return Ok(Err(E2::from(e.to_owned()))),
|
||||
MaybeDone::Pending(weak) => {
|
||||
// failure to upgrade can mean only one thing: there was an unexpected
|
||||
// panic which we consider as a transient retryable error.
|
||||
weak.upgrade()
|
||||
}
|
||||
MaybeDone::Done(Err(_retryable)) => None,
|
||||
MaybeDone::NotStarted => None,
|
||||
};
|
||||
|
||||
let (strong, maybe_fut) = match maybe_rx {
|
||||
Some(strong) => (strong, None),
|
||||
None => {
|
||||
// new attempt
|
||||
// panic safety: invoke the factory before configuring the pending value
|
||||
let fut = retry_with.make_future();
|
||||
|
||||
let (strong, fut) = self.make_run_and_complete(fut, &mut g);
|
||||
(strong, Some(fut))
|
||||
}
|
||||
};
|
||||
|
||||
// important: the Arc<Receiver> is not held after unlocking
|
||||
// important: we resubscribe before lock is released to be sure to get a message which
|
||||
// is sent once receiver is dropped
|
||||
let rx = strong.resubscribe();
|
||||
drop(strong);
|
||||
Err((rx, maybe_fut))
|
||||
}
|
||||
|
||||
/// Configure a new attempt, but leave spawning it to the caller.
|
||||
///
|
||||
/// Returns an `Arc<Receiver<V>>` which is valid until the attempt completes, and the future
|
||||
/// which will need to run to completion outside the lifecycle of the caller.
|
||||
fn make_run_and_complete(
|
||||
&self,
|
||||
fut: impl Future<Output = Result<T, E1>> + Send + 'static,
|
||||
g: &mut tokio::sync::MutexGuard<'_, MaybeDone<Result<T, E1>>>,
|
||||
) -> (
|
||||
Arc<tokio::sync::broadcast::Receiver<Result<T, E1>>>,
|
||||
impl Future<Output = ()> + Send + 'static,
|
||||
) {
|
||||
#[cfg(debug_assertions)]
|
||||
match &**g {
|
||||
MaybeDone::Pending(weak) => {
|
||||
assert!(
|
||||
weak.upgrade().is_none(),
|
||||
"when starting a restart, should no longer have an upgradeable channel"
|
||||
);
|
||||
}
|
||||
MaybeDone::Done(Err(err)) => {
|
||||
assert!(
|
||||
!err.is_permanent(),
|
||||
"when restarting, the err must be transient"
|
||||
);
|
||||
}
|
||||
MaybeDone::Done(Ok(_)) => {
|
||||
panic!("unexpected restart after a completion on MaybeDone");
|
||||
}
|
||||
MaybeDone::NotStarted => {}
|
||||
}
|
||||
|
||||
self.make_run_and_complete_any(fut, g)
|
||||
}
|
||||
|
||||
/// Oneshot alike as in it's a future which will be consumed by an `await`.
|
||||
///
|
||||
/// Otherwise the caller might think it's beneficial or reasonable to poll the channel multiple
|
||||
/// times.
|
||||
async fn make_oneshot_alike_receiver<E2>(
|
||||
mut rx: tokio::sync::broadcast::Receiver<Result<T, E1>>,
|
||||
) -> Result<T, E2>
|
||||
where
|
||||
E2: From<E1> + From<RetriedTaskPanicked>,
|
||||
{
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(Ok(t)) => Ok(t),
|
||||
Ok(Err(e)) => Err(E2::from(e)),
|
||||
Err(RecvError::Closed | RecvError::Lagged(_)) => {
|
||||
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
|
||||
// this case should only ever happen if a panick happened in the `fut`.
|
||||
Err(E2::from(RetriedTaskPanicked))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<V> SharedRetryable<V>
|
||||
where
|
||||
V: std::fmt::Debug + Clone + Send + 'static,
|
||||
{
|
||||
/// Attempt to run once a spawned future to completion.
|
||||
///
|
||||
/// Any previous attempt which panicked will be retried, but the `RetriedTaskPanicked` will be
|
||||
/// returned when the most recent attempt panicked.
|
||||
#[cfg(test)]
|
||||
pub async fn attempt_spawn(
|
||||
&self,
|
||||
attempt_with: impl MakeFuture<Output = V>,
|
||||
) -> Result<V, RetriedTaskPanicked> {
|
||||
let (rx, maybe_fut) = {
|
||||
let mut g = self.inner.lock().await;
|
||||
|
||||
let maybe_rx = match &*g {
|
||||
MaybeDone::Done(v) => return Ok(v.to_owned()),
|
||||
MaybeDone::Pending(weak) => {
|
||||
// see comment in try_restart
|
||||
weak.upgrade()
|
||||
}
|
||||
MaybeDone::NotStarted => None,
|
||||
};
|
||||
|
||||
let (strong, maybe_fut) = match maybe_rx {
|
||||
Some(strong) => (strong, None),
|
||||
None => {
|
||||
let fut = attempt_with.make_future();
|
||||
|
||||
let (strong, fut) = self.make_run_and_complete_any(fut, &mut g);
|
||||
(strong, Some(fut))
|
||||
}
|
||||
};
|
||||
|
||||
// see decide_to_retry_or_join for important notes
|
||||
let rx = strong.resubscribe();
|
||||
drop(strong);
|
||||
(rx, maybe_fut)
|
||||
};
|
||||
|
||||
if let Some(fut) = maybe_fut {
|
||||
// this is a top level function, need to spawn directly
|
||||
// from pageserver one wouldn't use this but more piecewise functions
|
||||
tokio::spawn(fut);
|
||||
}
|
||||
|
||||
let recv = Self::make_oneshot_alike_receiver_any(rx);
|
||||
|
||||
recv.await
|
||||
}
|
||||
|
||||
/// Configure a new attempt, but leave spawning it to the caller.
|
||||
///
|
||||
/// Forgetting the returned future is outside of scope of any correctness guarantees; all of
|
||||
/// the waiters will then be deadlocked, and the MaybeDone will forever be pending. Dropping
|
||||
/// and not running the future will then require a new attempt.
|
||||
///
|
||||
/// Also returns an `Arc<Receiver<V>>` which is valid until the attempt completes.
|
||||
fn make_run_and_complete_any(
|
||||
&self,
|
||||
fut: impl Future<Output = V> + Send + 'static,
|
||||
g: &mut tokio::sync::MutexGuard<'_, MaybeDone<V>>,
|
||||
) -> (
|
||||
Arc<tokio::sync::broadcast::Receiver<V>>,
|
||||
impl Future<Output = ()> + Send + 'static,
|
||||
) {
|
||||
let (tx, rx) = tokio::sync::broadcast::channel(1);
|
||||
let strong = Arc::new(rx);
|
||||
|
||||
**g = MaybeDone::Pending(Arc::downgrade(&strong));
|
||||
|
||||
let retry = {
|
||||
let strong = strong.clone();
|
||||
self.clone().run_and_complete(fut, tx, strong)
|
||||
};
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
match &**g {
|
||||
MaybeDone::Pending(weak) => {
|
||||
let rx = weak.upgrade().expect("holding the weak and strong locally");
|
||||
assert!(Arc::ptr_eq(&strong, &rx));
|
||||
}
|
||||
_ => unreachable!("MaybeDone::pending must be set after spawn_and_run_complete_any"),
|
||||
}
|
||||
|
||||
(strong, retry)
|
||||
}
|
||||
|
||||
/// Run the actual attempt, and communicate the response via both:
|
||||
/// - setting the `MaybeDone::Done`
|
||||
/// - the broadcast channel
|
||||
async fn run_and_complete(
|
||||
self,
|
||||
fut: impl Future<Output = V>,
|
||||
tx: tokio::sync::broadcast::Sender<V>,
|
||||
strong: Arc<tokio::sync::broadcast::Receiver<V>>,
|
||||
) {
|
||||
let res = fut.await;
|
||||
|
||||
{
|
||||
let mut g = self.inner.lock().await;
|
||||
g.complete(&strong, res.clone());
|
||||
|
||||
// make the weak un-upgradeable by dropping the final alive
|
||||
// reference to it. it is final Arc because the Arc never escapes
|
||||
// the critical section in `decide_to_retry_or_join` or `attempt_spawn`.
|
||||
Arc::try_unwrap(strong).expect("expected this to be the only Arc<Receiver<V>>");
|
||||
}
|
||||
|
||||
// now no one can get the Pending(weak) value to upgrade and they only see
|
||||
// the Done(res).
|
||||
//
|
||||
// send the result value to listeners, if any
|
||||
drop(tx.send(res));
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn make_oneshot_alike_receiver_any(
|
||||
mut rx: tokio::sync::broadcast::Receiver<V>,
|
||||
) -> Result<V, RetriedTaskPanicked> {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(t) => Ok(t),
|
||||
Err(RecvError::Closed | RecvError::Lagged(_)) => {
|
||||
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
|
||||
// this case should only ever happen if a panick happened in the `fut`.
|
||||
Err(RetriedTaskPanicked)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// MaybeDone handles synchronization for multiple requests and the single actual task.
|
||||
///
|
||||
/// If request handlers witness `Pending` which they are able to upgrade, they are guaranteed a
|
||||
/// useful `recv().await`, where useful means "value" or "disconnect" arrives. If upgrade fails,
|
||||
/// this means that "disconnect" has happened in the past.
|
||||
///
|
||||
/// On successful execution the one executing task will set this to `Done` variant, with the actual
|
||||
/// resulting value.
|
||||
#[derive(Debug, Default)]
|
||||
pub enum MaybeDone<V> {
|
||||
Pending(std::sync::Weak<tokio::sync::broadcast::Receiver<V>>),
|
||||
Done(V),
|
||||
#[default]
|
||||
NotStarted,
|
||||
}
|
||||
|
||||
impl<V: std::fmt::Debug> MaybeDone<V> {
|
||||
fn complete(&mut self, _strong: &Arc<tokio::sync::broadcast::Receiver<V>>, outcome: V) {
|
||||
#[cfg(debug_assertions)]
|
||||
match self {
|
||||
MaybeDone::Pending(weak) => {
|
||||
let same = weak
|
||||
.upgrade()
|
||||
// we don't yet have Receiver::same_channel
|
||||
.map(|rx| Arc::ptr_eq(_strong, &rx))
|
||||
.unwrap_or(false);
|
||||
assert!(same, "different channel had been replaced or dropped");
|
||||
}
|
||||
other => panic!("unexpected MaybeDone: {other:?}"),
|
||||
}
|
||||
|
||||
*self = MaybeDone::Done(outcome);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{RetriedTaskPanicked, Retryable, SharedRetryable};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum OuterError {
|
||||
AttemptPanicked,
|
||||
Unlucky,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
enum InnerError {
|
||||
Unlucky,
|
||||
}
|
||||
|
||||
impl Retryable for InnerError {
|
||||
fn is_permanent(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for OuterError {
|
||||
fn from(_: InnerError) -> Self {
|
||||
OuterError::Unlucky
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RetriedTaskPanicked> for OuterError {
|
||||
fn from(_: RetriedTaskPanicked) -> Self {
|
||||
OuterError::AttemptPanicked
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restartable_until_permanent() {
|
||||
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
|
||||
|
||||
let res = shr
|
||||
.try_restart_spawn(|| async move { panic!("really unlucky") })
|
||||
.await;
|
||||
|
||||
assert!(matches!(res, Err(OuterError::AttemptPanicked)));
|
||||
|
||||
let res = shr
|
||||
.try_restart_spawn(|| async move { Err(InnerError::Unlucky) })
|
||||
.await;
|
||||
|
||||
assert!(matches!(res, Err(OuterError::Unlucky)));
|
||||
|
||||
let res = shr.try_restart_spawn(|| async move { Ok(42) }).await;
|
||||
|
||||
assert!(matches!(res, Ok::<u8, OuterError>(42)));
|
||||
|
||||
let res = shr
|
||||
.try_restart_spawn(|| async move { panic!("rerun should clone Ok(42)") })
|
||||
.await;
|
||||
|
||||
assert!(matches!(res, Ok::<u8, OuterError>(42)));
|
||||
}
|
||||
|
||||
/// Demonstration of the SharedRetryable::attempt
|
||||
#[tokio::test]
|
||||
async fn attemptable_until_no_panic() {
|
||||
let shr = SharedRetryable::<u8>::default();
|
||||
|
||||
let res = shr
|
||||
.attempt_spawn(|| async move { panic!("should not interfere") })
|
||||
.await;
|
||||
|
||||
assert!(matches!(res, Err(RetriedTaskPanicked)), "{res:?}");
|
||||
|
||||
let res = shr.attempt_spawn(|| async move { 42 }).await;
|
||||
|
||||
assert_eq!(res, Ok(42));
|
||||
|
||||
let res = shr
|
||||
.attempt_spawn(|| async move { panic!("should not be called") })
|
||||
.await;
|
||||
|
||||
assert_eq!(res, Ok(42));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cancelling_spawner_is_fine() {
|
||||
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
|
||||
|
||||
let (recv1, maybe_fut) = shr
|
||||
.try_restart(|| async move { panic!("should not have been called") })
|
||||
.await;
|
||||
let should_be_spawned = maybe_fut.unwrap();
|
||||
|
||||
let (recv2, maybe_fut) = shr
|
||||
.try_restart(|| async move {
|
||||
panic!("should never be called because waiting on should_be_spawned")
|
||||
})
|
||||
.await;
|
||||
assert!(
|
||||
matches!(maybe_fut, None),
|
||||
"only the first one should had created the future"
|
||||
);
|
||||
|
||||
let mut recv1 = std::pin::pin!(recv1);
|
||||
let mut recv2 = std::pin::pin!(recv2);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {},
|
||||
_ = &mut recv1 => unreachable!("should not have completed because should_be_spawned not spawned"),
|
||||
_ = &mut recv2 => unreachable!("should not have completed because should_be_spawned not spawned"),
|
||||
}
|
||||
|
||||
drop(should_be_spawned);
|
||||
|
||||
let res = recv1.await;
|
||||
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
|
||||
|
||||
let res = recv2.await;
|
||||
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
|
||||
|
||||
// but we can still reach a terminal state if the api is not misused or the
|
||||
// should_be_spawned winner is not cancelled
|
||||
|
||||
let recv1 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(42) });
|
||||
let recv2 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(43) });
|
||||
|
||||
assert_eq!(recv1.await.unwrap(), 42);
|
||||
assert_eq!(recv2.await.unwrap(), 42, "43 should never be returned");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn service_example() {
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum OneLevelError {
|
||||
TaskPanicked,
|
||||
}
|
||||
|
||||
impl Retryable for OneLevelError {
|
||||
fn is_permanent(&self) -> bool {
|
||||
// for a single level errors, this wording is weird
|
||||
!matches!(self, OneLevelError::TaskPanicked)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RetriedTaskPanicked> for OneLevelError {
|
||||
fn from(_: RetriedTaskPanicked) -> Self {
|
||||
OneLevelError::TaskPanicked
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct Service(SharedRetryable<Result<u8, OneLevelError>>);
|
||||
|
||||
impl Service {
|
||||
async fn work(
|
||||
&self,
|
||||
completions: Arc<std::sync::atomic::AtomicUsize>,
|
||||
) -> Result<u8, OneLevelError> {
|
||||
self.0
|
||||
.try_restart_spawn(|| async move {
|
||||
// give time to cancel some of the tasks
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
completions.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
Self::work_once().await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn work_once() -> Result<u8, OneLevelError> {
|
||||
Ok(42)
|
||||
}
|
||||
}
|
||||
|
||||
let svc = Service::default();
|
||||
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
|
||||
let barrier = Arc::new(tokio::sync::Barrier::new(10 + 1));
|
||||
let completions = Arc::new(std::sync::atomic::AtomicUsize::new(0));
|
||||
|
||||
let handles = (0..10)
|
||||
.map(|_| {
|
||||
js.spawn({
|
||||
let svc = svc.clone();
|
||||
let barrier = barrier.clone();
|
||||
let completions = completions.clone();
|
||||
async move {
|
||||
// make sure all tasks are ready to start at the same time
|
||||
barrier.wait().await;
|
||||
// after successfully starting the work, any of the futures could get cancelled
|
||||
svc.work(completions).await
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
barrier.wait().await;
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
handles[5].abort();
|
||||
|
||||
let mut cancellations = 0;
|
||||
|
||||
while let Some(res) = js.join_next().await {
|
||||
// all complete with the same result
|
||||
match res {
|
||||
Ok(res) => assert_eq!(res.unwrap(), 42),
|
||||
Err(je) => {
|
||||
// except for the one task we cancelled; it's cancelling
|
||||
// does not interfere with the result
|
||||
assert!(je.is_cancelled());
|
||||
cancellations += 1;
|
||||
assert_eq!(cancellations, 1, "only 6th task was aborted");
|
||||
// however we cannot assert that everytime we get to cancel the 6th task
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// there will be at most one terminal completion
|
||||
assert_eq!(completions.load(std::sync::atomic::Ordering::Relaxed), 1);
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ use super::models::{
|
||||
};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::disk_usage_eviction_task;
|
||||
use crate::metrics::STORAGE_TIME_GLOBAL;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
@@ -708,6 +709,11 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
|
||||
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let _timer = STORAGE_TIME_GLOBAL
|
||||
.get_metric_with_label_values(&["create tenant"])
|
||||
.expect("bug")
|
||||
.start_timer();
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
@@ -743,6 +749,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
res.context("created tenant failed to become active")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
|
||||
json_response(
|
||||
StatusCode::CREATED,
|
||||
TenantCreateResponse(new_tenant.tenant_id()),
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::ops::Range;
|
||||
///
|
||||
/// Represents a set of Keys, in a compact form.
|
||||
///
|
||||
#[derive(Clone, Debug, Default)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct KeySpace {
|
||||
/// Contiguous ranges of keys that belong to the key space. In key order,
|
||||
/// and with no overlap.
|
||||
@@ -61,58 +61,6 @@ impl KeySpace {
|
||||
|
||||
KeyPartitioning { parts }
|
||||
}
|
||||
|
||||
///
|
||||
/// Calculate logical size of delta layers: total size of all blocks covered by it's key range
|
||||
///
|
||||
pub fn get_logical_size(&self, range: &Range<Key>) -> u64 {
|
||||
let mut start_key = range.start;
|
||||
let n_ranges = self.ranges.len();
|
||||
let start_index = match self.ranges.binary_search_by_key(&start_key, |r| r.start) {
|
||||
Ok(index) => index, // keyspace range starts with start_key
|
||||
Err(index) => {
|
||||
if index != 0 && self.ranges[index - 1].end > start_key {
|
||||
index - 1 // previous keyspace range overlaps with specified
|
||||
} else if index == n_ranges {
|
||||
return 0; // no intersection with specified range
|
||||
} else {
|
||||
start_key = self.ranges[index].start;
|
||||
index
|
||||
}
|
||||
}
|
||||
};
|
||||
let mut size = 0u64;
|
||||
for i in start_index..n_ranges {
|
||||
if self.ranges[i].start >= range.end {
|
||||
break;
|
||||
}
|
||||
let end_key = if self.ranges[i].end < range.end {
|
||||
self.ranges[i].end
|
||||
} else {
|
||||
range.end
|
||||
};
|
||||
let n_blocks = key_range_size(&(start_key..end_key));
|
||||
if n_blocks != u32::MAX {
|
||||
size += n_blocks as u64 * BLCKSZ as u64;
|
||||
}
|
||||
if i + 1 < n_ranges {
|
||||
start_key = self.ranges[i + 1].start;
|
||||
}
|
||||
}
|
||||
size
|
||||
}
|
||||
|
||||
///
|
||||
/// Check if key space contains overlapping range
|
||||
///
|
||||
pub fn overlaps(&self, range: &Range<Key>) -> bool {
|
||||
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
|
||||
Ok(0) => false,
|
||||
Err(0) => false,
|
||||
Ok(index) => self.ranges[index - 1].end > range.start,
|
||||
Err(index) => self.ranges[index - 1].end > range.start,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -181,226 +129,3 @@ impl KeySpaceAccum {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// A helper object, to collect a set of keys and key ranges into a KeySpace
|
||||
/// object. Key ranges may be inserted in any order and can overlap.
|
||||
///
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct KeySpaceRandomAccum {
|
||||
ranges: Vec<Range<Key>>,
|
||||
}
|
||||
|
||||
impl KeySpaceRandomAccum {
|
||||
pub fn new() -> Self {
|
||||
Self { ranges: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn add_key(&mut self, key: Key) {
|
||||
self.add_range(singleton_range(key))
|
||||
}
|
||||
|
||||
pub fn add_range(&mut self, range: Range<Key>) {
|
||||
self.ranges.push(range);
|
||||
}
|
||||
|
||||
pub fn to_keyspace(mut self) -> KeySpace {
|
||||
let mut ranges = Vec::new();
|
||||
if !self.ranges.is_empty() {
|
||||
self.ranges.sort_by_key(|r| r.start);
|
||||
let mut start = self.ranges.first().unwrap().start;
|
||||
let mut end = self.ranges.first().unwrap().end;
|
||||
for r in self.ranges {
|
||||
assert!(r.start >= start);
|
||||
if r.start > end {
|
||||
ranges.push(start..end);
|
||||
start = r.start;
|
||||
end = r.end;
|
||||
} else if r.end > end {
|
||||
end = r.end;
|
||||
}
|
||||
}
|
||||
ranges.push(start..end);
|
||||
}
|
||||
KeySpace { ranges }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::fmt::Write;
|
||||
|
||||
// Helper function to create a key range.
|
||||
//
|
||||
// Make the tests below less verbose.
|
||||
fn kr(irange: Range<i128>) -> Range<Key> {
|
||||
Key::from_i128(irange.start)..Key::from_i128(irange.end)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn dump_keyspace(ks: &KeySpace) {
|
||||
for r in ks.ranges.iter() {
|
||||
println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
|
||||
if actual.ranges != expected {
|
||||
let mut msg = String::new();
|
||||
|
||||
writeln!(msg, "expected:").unwrap();
|
||||
for r in &expected {
|
||||
writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
|
||||
}
|
||||
writeln!(msg, "got:").unwrap();
|
||||
for r in &actual.ranges {
|
||||
writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
|
||||
}
|
||||
panic!("{}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keyspace_add_range() {
|
||||
// two separate ranges
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(0..10));
|
||||
ks.add_range(kr(20..30));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
|
||||
|
||||
// two separate ranges, added in reverse order
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(20..30));
|
||||
ks.add_range(kr(0..10));
|
||||
|
||||
// add range that is adjacent to the end of an existing range
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
ks.add_range(kr(0..10));
|
||||
ks.add_range(kr(10..30));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that is adjacent to the start of an existing range
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(10..30));
|
||||
ks.add_range(kr(0..10));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that overlaps with the end of an existing range
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(0..10));
|
||||
ks.add_range(kr(5..30));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that overlaps with the start of an existing range
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(5..30));
|
||||
ks.add_range(kr(0..10));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that is fully covered by an existing range
|
||||
//
|
||||
// #########
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(0..30));
|
||||
ks.add_range(kr(10..20));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that extends an existing range from both ends
|
||||
//
|
||||
// #####
|
||||
// #########
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(10..20));
|
||||
ks.add_range(kr(0..30));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add a range that overlaps with two existing ranges, joining them
|
||||
//
|
||||
// ##### #####
|
||||
// #######
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(0..10));
|
||||
ks.add_range(kr(20..30));
|
||||
ks.add_range(kr(5..25));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keyspace_overlaps() {
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(10..20));
|
||||
ks.add_range(kr(30..40));
|
||||
let ks = ks.to_keyspace();
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(0..5)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(5..9)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(5..10)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(5..11)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(10..15)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(15..20)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(15..25)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(22..28)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(25..30)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(35..35)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(40..45)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(45..50)));
|
||||
|
||||
// ##### #####
|
||||
// xxxxxxxxxxx
|
||||
assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ const STORAGE_TIME_OPERATIONS: &[&str] = &[
|
||||
"imitate logical size",
|
||||
"load layer map",
|
||||
"gc",
|
||||
"create tenant",
|
||||
];
|
||||
|
||||
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
|
||||
|
||||
@@ -272,6 +272,9 @@ pub enum TaskKind {
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
|
||||
/// Task which is the only task to delete this particular timeline
|
||||
DeleteTimeline,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -459,6 +459,53 @@ pub enum DeleteTimelineError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error, Clone, Copy)]
|
||||
enum InnerDeleteTimelineError {
|
||||
#[error("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")]
|
||||
QueueUninitialized,
|
||||
|
||||
#[error("failpoint: {0}")]
|
||||
Failpoint(&'static str),
|
||||
|
||||
#[error("failed to remove local timeline directory")]
|
||||
FailedToRemoveLocalTimelineDirectory,
|
||||
|
||||
#[error("index_part.json upload failed")]
|
||||
UploadFailed,
|
||||
|
||||
#[error("the deleted timeline grew branches while deleting it; tenant should now be broken")]
|
||||
DeletedGrewChildren,
|
||||
|
||||
#[error("panicked while removing the timeline from Tenant::timelines")]
|
||||
OtherPanic,
|
||||
}
|
||||
|
||||
impl utils::shared_retryable::Retryable for InnerDeleteTimelineError {
|
||||
fn is_permanent(&self) -> bool {
|
||||
use InnerDeleteTimelineError::*;
|
||||
|
||||
match self {
|
||||
QueueUninitialized => false,
|
||||
Failpoint(_) => false,
|
||||
FailedToRemoveLocalTimelineDirectory => false,
|
||||
UploadFailed => false,
|
||||
DeletedGrewChildren | OtherPanic => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerDeleteTimelineError> for DeleteTimelineError {
|
||||
fn from(value: InnerDeleteTimelineError) -> Self {
|
||||
DeleteTimelineError::Other(anyhow::Error::new(value))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<utils::shared_retryable::RetriedTaskPanicked> for DeleteTimelineError {
|
||||
fn from(_: utils::shared_retryable::RetriedTaskPanicked) -> Self {
|
||||
DeleteTimelineError::Other(anyhow::anyhow!("deleting timeline failed, please retry"))
|
||||
}
|
||||
}
|
||||
|
||||
struct RemoteStartupData {
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
@@ -1426,7 +1473,7 @@ impl Tenant {
|
||||
|
||||
/// Removes timeline-related in-memory data
|
||||
pub async fn delete_timeline(
|
||||
&self,
|
||||
self: &Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
@@ -1459,162 +1506,200 @@ impl Tenant {
|
||||
timeline
|
||||
};
|
||||
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to
|
||||
// shut down.
|
||||
// if we have concurrent requests, we will only execute one version of following future;
|
||||
// initially it did not have any means to be cancelled.
|
||||
//
|
||||
// NB: If you call delete_timeline multiple times concurrently, they will
|
||||
// all go through the motions here. Make sure the code here is idempotent,
|
||||
// and don't error out if some of the shutdown tasks have already been
|
||||
// completed!
|
||||
// NOTE: Unlike "the usual" futures, this one will log any errors instead of just propagating
|
||||
// them to the caller. This is because this one future produces a value, which will need to
|
||||
// be cloneable to everyone interested, and normal `std::error::Error` are not clonable.
|
||||
// Also, it wouldn't make sense to log the same failure multiple times, it would look like
|
||||
// multiple failures to anyone reading the logs.
|
||||
let factory = || {
|
||||
let tenant = self.clone();
|
||||
let tenant_id = self.tenant_id;
|
||||
let timeline = timeline.clone();
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
// Stop the walreceiver first.
|
||||
debug!("waiting for wal receiver to shutdown");
|
||||
timeline.walreceiver.stop().await;
|
||||
debug!("wal receiver shutdown confirmed");
|
||||
async move {
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to
|
||||
// shut down.
|
||||
//
|
||||
// Stop the walreceiver first.
|
||||
debug!("waiting for wal receiver to shutdown");
|
||||
timeline.walreceiver.stop().await;
|
||||
debug!("wal receiver shutdown confirmed");
|
||||
|
||||
// Prevent new uploads from starting.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.stop();
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(e) => match e {
|
||||
remote_timeline_client::StopError::QueueUninitialized => {
|
||||
// This case shouldn't happen currently because the
|
||||
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
|
||||
// That is, before we declare the Tenant as Active.
|
||||
// But we only allow calls to delete_timeline on Active tenants.
|
||||
return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")));
|
||||
// Prevent new uploads from starting.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.stop();
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(e) => match e {
|
||||
remote_timeline_client::StopError::QueueUninitialized => {
|
||||
// This case shouldn't happen currently because the
|
||||
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
|
||||
// That is, before we declare the Tenant as Active.
|
||||
// But we only allow calls to delete_timeline on Active tenants.
|
||||
warn!("failed to stop RemoteTimelineClient due to uninitialized queue");
|
||||
return Err(InnerDeleteTimelineError::QueueUninitialized);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Stop & wait for the remaining timeline tasks, including upload tasks.
|
||||
info!("waiting for timeline tasks to shutdown");
|
||||
task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await;
|
||||
|
||||
// Mark timeline as deleted in S3 so we won't pick it up next time
|
||||
// during attach or pageserver restart.
|
||||
// See comment in persist_index_part_with_deleted_flag.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
match remote_client.persist_index_part_with_deleted_flag().await {
|
||||
// If we (now, or already) marked it successfully as deleted, we can proceed
|
||||
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
|
||||
// Bail out otherwise
|
||||
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
|
||||
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
|
||||
warn!("upload failed: {e}");
|
||||
return Err(InnerDeleteTimelineError::UploadFailed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Grab the layer_removal_cs lock, and actually perform the deletion.
|
||||
//
|
||||
// This lock prevents multiple concurrent delete_timeline calls from
|
||||
// stepping on each other's toes, while deleting the files. It also
|
||||
// prevents GC or compaction from running at the same time.
|
||||
//
|
||||
// Note that there are still other race conditions between
|
||||
// GC, compaction and timeline deletion. GC task doesn't
|
||||
// register itself properly with the timeline it's
|
||||
// operating on. See
|
||||
// https://github.com/neondatabase/neon/issues/2671
|
||||
//
|
||||
// No timeout here, GC & Compaction should be responsive to the
|
||||
// `TimelineState::Stopping` change.
|
||||
info!("waiting for layer_removal_cs.lock()");
|
||||
let _layer_removal_guard = timeline.layer_removal_cs.lock().await;
|
||||
info!("got layer_removal_cs.lock(), deleting layer files");
|
||||
|
||||
// NB: storage_sync upload tasks that reference these layers have been cancelled
|
||||
// by us earlier.
|
||||
|
||||
let local_timeline_directory =
|
||||
tenant.conf.timeline_path(&timeline_id, &tenant_id);
|
||||
|
||||
fail::fail_point!("timeline-delete-before-rm", |_| {
|
||||
Err(InnerDeleteTimelineError::Failpoint(
|
||||
"failpoint: timeline-delete-before-rm",
|
||||
))
|
||||
});
|
||||
|
||||
// NB: This need not be atomic because the deleted flag in the IndexPart
|
||||
// will be observed during tenant/timeline load. The deletion will be resumed there.
|
||||
//
|
||||
// For configurations without remote storage, we tolerate that we're not crash-safe here.
|
||||
// The timeline may come up Active but with missing layer files, in such setups.
|
||||
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
|
||||
match std::fs::remove_dir_all(&local_timeline_directory) {
|
||||
Ok(()) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
// This can happen if we're called a second time, e.g.,
|
||||
// because of a previous failure/cancellation at/after
|
||||
// failpoint timeline-delete-after-rm.
|
||||
//
|
||||
// It can also happen if we race with tenant detach, because,
|
||||
// it doesn't grab the layer_removal_cs lock.
|
||||
//
|
||||
// For now, log and continue.
|
||||
// warn! level is technically not appropriate for the
|
||||
// first case because we should expect retries to happen.
|
||||
// But the error is so rare, it seems better to get attention if it happens.
|
||||
let tenant_state = tenant.current_state();
|
||||
warn!(
|
||||
timeline_dir=?local_timeline_directory,
|
||||
?tenant_state,
|
||||
"timeline directory not found, proceeding anyway"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"failed to remove local timeline directory {}: {e:#}",
|
||||
local_timeline_directory.display()
|
||||
);
|
||||
return Err(
|
||||
InnerDeleteTimelineError::FailedToRemoveLocalTimelineDirectory,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
Err(InnerDeleteTimelineError::Failpoint(
|
||||
"timeline-delete-after-rm",
|
||||
))
|
||||
});
|
||||
|
||||
// Remove the timeline from the map or poison it if we've grown children.
|
||||
// The poisonsed map will make the child's GetPage accesses fail.
|
||||
//
|
||||
// Growing children can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
|
||||
let removed_timeline =
|
||||
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let children_exist = timelines.iter().any(|(_, entry)| {
|
||||
entry.get_ancestor_timeline_id() == Some(timeline_id)
|
||||
});
|
||||
if children_exist {
|
||||
std::panic::panic_any(InnerDeleteTimelineError::DeletedGrewChildren);
|
||||
}
|
||||
timelines.remove(&timeline_id)
|
||||
}));
|
||||
|
||||
match removed_timeline {
|
||||
Ok(Some(_)) => Ok(()),
|
||||
Ok(None) => {
|
||||
// with SharedRetryable this should no longer happen
|
||||
warn!("no other task should had dropped the Timeline");
|
||||
Ok(())
|
||||
}
|
||||
Err(panic) => {
|
||||
if let Some(err) = panic.downcast_ref::<InnerDeleteTimelineError>() {
|
||||
Err(*err)
|
||||
} else {
|
||||
// the panic has already been formatted by hook, don't worry about it
|
||||
Err(InnerDeleteTimelineError::OtherPanic)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// execute in the *winner's* span so we will capture the request id etc.
|
||||
.in_current_span()
|
||||
};
|
||||
|
||||
let (recv, maybe_fut) = timeline.delete_self.try_restart(factory).await;
|
||||
|
||||
if let Some(fut) = maybe_fut {
|
||||
crate::task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::DeleteTimeline,
|
||||
Some(self.tenant_id()),
|
||||
None,
|
||||
&format!("delete_timeline {}", timeline.timeline_id),
|
||||
false,
|
||||
async move {
|
||||
fut.await;
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Stop & wait for the remaining timeline tasks, including upload tasks.
|
||||
// NB: This and other delete_timeline calls do not run as a task_mgr task,
|
||||
// so, they are not affected by this shutdown_tasks() call.
|
||||
info!("waiting for timeline tasks to shutdown");
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await;
|
||||
|
||||
// Mark timeline as deleted in S3 so we won't pick it up next time
|
||||
// during attach or pageserver restart.
|
||||
// See comment in persist_index_part_with_deleted_flag.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
match remote_client.persist_index_part_with_deleted_flag().await {
|
||||
// If we (now, or already) marked it successfully as deleted, we can proceed
|
||||
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
|
||||
// Bail out otherwise
|
||||
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
|
||||
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
|
||||
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Grab the layer_removal_cs lock, and actually perform the deletion.
|
||||
//
|
||||
// This lock prevents multiple concurrent delete_timeline calls from
|
||||
// stepping on each other's toes, while deleting the files. It also
|
||||
// prevents GC or compaction from running at the same time.
|
||||
//
|
||||
// Note that there are still other race conditions between
|
||||
// GC, compaction and timeline deletion. GC task doesn't
|
||||
// register itself properly with the timeline it's
|
||||
// operating on. See
|
||||
// https://github.com/neondatabase/neon/issues/2671
|
||||
//
|
||||
// No timeout here, GC & Compaction should be responsive to the
|
||||
// `TimelineState::Stopping` change.
|
||||
info!("waiting for layer_removal_cs.lock()");
|
||||
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
|
||||
info!("got layer_removal_cs.lock(), deleting layer files");
|
||||
|
||||
// NB: storage_sync upload tasks that reference these layers have been cancelled
|
||||
// by the caller.
|
||||
|
||||
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
|
||||
|
||||
fail::fail_point!("timeline-delete-before-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
|
||||
});
|
||||
|
||||
// NB: This need not be atomic because the deleted flag in the IndexPart
|
||||
// will be observed during tenant/timeline load. The deletion will be resumed there.
|
||||
//
|
||||
// For configurations without remote storage, we tolerate that we're not crash-safe here.
|
||||
// The timeline may come up Active but with missing layer files, in such setups.
|
||||
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
|
||||
match std::fs::remove_dir_all(&local_timeline_directory) {
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
// This can happen if we're called a second time, e.g.,
|
||||
// because of a previous failure/cancellation at/after
|
||||
// failpoint timeline-delete-after-rm.
|
||||
//
|
||||
// It can also happen if we race with tenant detach, because,
|
||||
// it doesn't grab the layer_removal_cs lock.
|
||||
//
|
||||
// For now, log and continue.
|
||||
// warn! level is technically not appropriate for the
|
||||
// first case because we should expect retries to happen.
|
||||
// But the error is so rare, it seems better to get attention if it happens.
|
||||
let tenant_state = self.current_state();
|
||||
warn!(
|
||||
timeline_dir=?local_timeline_directory,
|
||||
?tenant_state,
|
||||
"timeline directory not found, proceeding anyway"
|
||||
);
|
||||
// continue with the rest of the deletion
|
||||
}
|
||||
res => res.with_context(|| {
|
||||
format!(
|
||||
"Failed to remove local timeline directory '{}'",
|
||||
local_timeline_directory.display()
|
||||
)
|
||||
})?,
|
||||
}
|
||||
|
||||
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
|
||||
drop(layer_removal_guard);
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
|
||||
});
|
||||
|
||||
// Remove the timeline from the map.
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let children_exist = timelines
|
||||
.iter()
|
||||
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
|
||||
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
|
||||
// We already deleted the layer files, so it's probably best to panic.
|
||||
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
|
||||
if children_exist {
|
||||
panic!("Timeline grew children while we removed layer files");
|
||||
}
|
||||
let removed_timeline = timelines.remove(&timeline_id);
|
||||
if removed_timeline.is_none() {
|
||||
// This can legitimately happen if there's a concurrent call to this function.
|
||||
// T1 T2
|
||||
// lock
|
||||
// unlock
|
||||
// lock
|
||||
// unlock
|
||||
// remove files
|
||||
// lock
|
||||
// remove from map
|
||||
// unlock
|
||||
// return
|
||||
// remove files
|
||||
// lock
|
||||
// remove from map observes empty map
|
||||
// unlock
|
||||
// return
|
||||
debug!("concurrent call to this function won the race");
|
||||
}
|
||||
drop(timelines);
|
||||
|
||||
Ok(())
|
||||
recv.await
|
||||
}
|
||||
|
||||
pub fn current_state(&self) -> TenantState {
|
||||
@@ -2195,7 +2280,7 @@ impl Tenant {
|
||||
// made.
|
||||
break;
|
||||
}
|
||||
let result = timeline.gc(ctx).await?;
|
||||
let result = timeline.gc().await?;
|
||||
totals += result;
|
||||
}
|
||||
|
||||
@@ -3754,7 +3839,7 @@ mod tests {
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.gc(&ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -3826,7 +3911,7 @@ mod tests {
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.gc(&ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -3910,7 +3995,7 @@ mod tests {
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
tline.gc(&ctx).await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -22,7 +22,8 @@ use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -47,7 +48,7 @@ use crate::tenant::{
|
||||
};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
@@ -122,17 +123,6 @@ pub struct Timeline {
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
/// It is used by compaction task when it checks if new image layer should be created.
|
||||
/// Newly created image layer doesn't help to remove the delta layer, until the
|
||||
/// newly created image layer falls off the PITR horizon. So on next GC cycle,
|
||||
/// gc_timeline may still want the new image layer to be created. To avoid redundant
|
||||
/// image layers creation we should check if image layer exists but beyond PITR horizon.
|
||||
/// This is why we need remember GC cutoff LSN.
|
||||
///
|
||||
wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
@@ -237,6 +227,9 @@ pub struct Timeline {
|
||||
state: watch::Sender<TimelineState>,
|
||||
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
|
||||
pub(super) delete_self:
|
||||
utils::shared_retryable::SharedRetryable<Result<(), super::InnerDeleteTimelineError>>,
|
||||
}
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
@@ -1364,7 +1357,6 @@ impl Timeline {
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: RwLock::new(LayerMap::default()),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
walreceiver,
|
||||
@@ -1432,6 +1424,8 @@ impl Timeline {
|
||||
eviction_task_timeline_state: tokio::sync::Mutex::new(
|
||||
EvictionTaskTimelineState::default(),
|
||||
),
|
||||
|
||||
delete_self: utils::shared_retryable::SharedRetryable::default(),
|
||||
};
|
||||
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
||||
result
|
||||
@@ -2915,30 +2909,6 @@ impl Timeline {
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
let mut max_deltas = 0;
|
||||
{
|
||||
let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
|
||||
if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
|
||||
let img_range =
|
||||
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
|
||||
if wanted.overlaps(&img_range) {
|
||||
//
|
||||
// gc_timeline only pays attention to image layers that are older than the GC cutoff,
|
||||
// but create_image_layers creates image layers at last-record-lsn.
|
||||
// So it's possible that gc_timeline wants a new image layer to be created for a key range,
|
||||
// but the range is already covered by image layers at more recent LSNs. Before we
|
||||
// create a new image layer, check if the range is already covered at more recent LSNs.
|
||||
if !layers
|
||||
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
|
||||
{
|
||||
debug!(
|
||||
"Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
|
||||
img_range.start, img_range.end, cutoff_lsn, lsn
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
let image_coverage = layers.image_coverage(part_range, lsn)?;
|
||||
@@ -3058,12 +3028,6 @@ impl Timeline {
|
||||
image_layers.push(image_layer);
|
||||
}
|
||||
}
|
||||
// All layers that the GC wanted us to create have now been created.
|
||||
//
|
||||
// It's possible that another GC cycle happened while we were compacting, and added
|
||||
// something new to wanted_image_layers, and we now clear that before processing it.
|
||||
// That's OK, because the next GC iteration will put it back in.
|
||||
*self.wanted_image_layers.lock().unwrap() = None;
|
||||
|
||||
// Sync the new layer to disk before adding it to the layer map, to make sure
|
||||
// we don't garbage collect something based on the new layer, before it has
|
||||
@@ -3667,7 +3631,7 @@ impl Timeline {
|
||||
/// within a layer file. We can only remove the whole file if it's fully
|
||||
/// obsolete.
|
||||
///
|
||||
pub(super) async fn gc(&self, ctx: &RequestContext) -> anyhow::Result<GcResult> {
|
||||
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
|
||||
let timer = self.metrics.garbage_collect_histo.start_timer();
|
||||
|
||||
fail_point!("before-timeline-gc");
|
||||
@@ -3697,7 +3661,6 @@ impl Timeline {
|
||||
pitr_cutoff,
|
||||
retain_lsns,
|
||||
new_gc_cutoff,
|
||||
ctx,
|
||||
)
|
||||
.instrument(
|
||||
info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff),
|
||||
@@ -3717,7 +3680,6 @@ impl Timeline {
|
||||
pitr_cutoff: Lsn,
|
||||
retain_lsns: Vec<Lsn>,
|
||||
new_gc_cutoff: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
let now = SystemTime::now();
|
||||
let mut result: GcResult = GcResult::default();
|
||||
@@ -3763,16 +3725,6 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let mut layers_to_remove = Vec::new();
|
||||
let mut wanted_image_layers = KeySpaceRandomAccum::default();
|
||||
// Do not collect keyspace for Unit tests
|
||||
let gc_keyspace = if ctx.task_kind() == TaskKind::GarbageCollector {
|
||||
Some(
|
||||
self.collect_keyspace(self.get_last_record_lsn(), ctx)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Scan all layers in the timeline (remote or on-disk).
|
||||
//
|
||||
@@ -3856,21 +3808,6 @@ impl Timeline {
|
||||
"keeping {} because it is the latest layer",
|
||||
l.filename().file_name()
|
||||
);
|
||||
// Collect delta key ranges that need image layers to allow garbage
|
||||
// collecting the layers.
|
||||
// It is not so obvious whether we need to propagate information only about
|
||||
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
|
||||
// But image layers are in any case less sparse than delta layers. Also we need some
|
||||
// protection from replacing recent image layers with new one after each GC iteration.
|
||||
if l.is_incremental() && !LayerMap::is_l0(&*l) {
|
||||
if let Some(keyspace) = &gc_keyspace {
|
||||
let layer_logical_size = keyspace.get_logical_size(&l.get_key_range());
|
||||
let layer_age = new_gc_cutoff.0 - l.get_lsn_range().start.0;
|
||||
if layer_logical_size <= layer_age {
|
||||
wanted_image_layers.add_range(l.get_key_range());
|
||||
}
|
||||
}
|
||||
}
|
||||
result.layers_not_updated += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
@@ -3883,10 +3820,6 @@ impl Timeline {
|
||||
);
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
}
|
||||
self.wanted_image_layers
|
||||
.lock()
|
||||
.unwrap()
|
||||
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
|
||||
|
||||
let mut updates = layers.batch_update();
|
||||
if !layers_to_remove.is_empty() {
|
||||
|
||||
@@ -192,8 +192,9 @@ retry:
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
{
|
||||
neon_log(LOG, "could not get response from pageserver: %s",
|
||||
PQerrorMessage(pageserver_conn));
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
neon_log(LOG, "could not get response from pageserver: %s", msg);
|
||||
pfree(msg);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@@ -343,7 +344,7 @@ pageserver_receive(void)
|
||||
resp = NULL;
|
||||
}
|
||||
else if (rc == -2)
|
||||
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||
neon_log(ERROR, "could not read COPY data: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
else
|
||||
neon_log(ERROR, "unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
@@ -367,7 +368,7 @@ pageserver_flush(void)
|
||||
}
|
||||
else if (PQflush(pageserver_conn))
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "failed to flush page requests: %s", msg);
|
||||
|
||||
@@ -7,6 +7,7 @@ mod credentials;
|
||||
pub use credentials::ClientCredentials;
|
||||
|
||||
mod password_hack;
|
||||
pub use password_hack::parse_endpoint_param;
|
||||
use password_hack::PasswordHackPayload;
|
||||
|
||||
mod flow;
|
||||
@@ -44,10 +45,10 @@ pub enum AuthErrorImpl {
|
||||
#[error(
|
||||
"Endpoint ID is not specified. \
|
||||
Either please upgrade the postgres client library (libpq) for SNI support \
|
||||
or pass the endpoint ID (first part of the domain name) as a parameter: '?options=project%3D<endpoint-id>'. \
|
||||
or pass the endpoint ID (first part of the domain name) as a parameter: '?options=endpoint%3D<endpoint-id>'. \
|
||||
See more at https://neon.tech/sni"
|
||||
)]
|
||||
MissingProjectName,
|
||||
MissingEndpointName,
|
||||
|
||||
#[error("password authentication failed for user '{0}'")]
|
||||
AuthFailed(Box<str>),
|
||||
@@ -88,7 +89,7 @@ impl UserFacingError for AuthError {
|
||||
AuthFailed(_) => self.to_string(),
|
||||
BadAuthMethod(_) => self.to_string(),
|
||||
MalformedPassword(_) => self.to_string(),
|
||||
MissingProjectName => self.to_string(),
|
||||
MissingEndpointName => self.to_string(),
|
||||
Io(_) => "Internal error".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,8 +52,8 @@ pub async fn password_hack(
|
||||
.authenticate()
|
||||
.await?;
|
||||
|
||||
info!(project = &payload.project, "received missing parameter");
|
||||
creds.project = Some(payload.project);
|
||||
info!(project = &payload.endpoint, "received missing parameter");
|
||||
creds.project = Some(payload.endpoint);
|
||||
|
||||
let mut node = api.wake_compute(extra, creds).await?;
|
||||
node.config.password(payload.password);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! User credentials used in authentication.
|
||||
|
||||
use crate::error::UserFacingError;
|
||||
use crate::{auth::password_hack::parse_endpoint_param, error::UserFacingError};
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::collections::HashSet;
|
||||
use thiserror::Error;
|
||||
@@ -61,7 +62,15 @@ impl<'a> ClientCredentials<'a> {
|
||||
// Project name might be passed via PG's command-line options.
|
||||
let project_option = params
|
||||
.options_raw()
|
||||
.and_then(|mut options| options.find_map(|opt| opt.strip_prefix("project=")))
|
||||
.and_then(|options| {
|
||||
// We support both `project` (deprecated) and `endpoint` options for backward compatibility.
|
||||
// However, if both are present, we don't exactly know which one to use.
|
||||
// Therefore we require that only one of them is present.
|
||||
options
|
||||
.filter_map(parse_endpoint_param)
|
||||
.at_most_one()
|
||||
.ok()?
|
||||
})
|
||||
.map(|name| name.to_string());
|
||||
|
||||
let project_from_domain = if let Some(sni_str) = sni {
|
||||
@@ -177,6 +186,51 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_endpoint_from_options() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
("options", "-ckey=1 endpoint=bar -c geqo=off"),
|
||||
]);
|
||||
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert_eq!(creds.project.as_deref(), Some("bar"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_three_endpoints_from_options() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
(
|
||||
"options",
|
||||
"-ckey=1 endpoint=one endpoint=two endpoint=three -c geqo=off",
|
||||
),
|
||||
]);
|
||||
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert!(creds.project.is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_when_endpoint_and_project_are_in_options() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"),
|
||||
]);
|
||||
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert!(creds.project.is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_projects_identical() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([("user", "john_doe"), ("options", "project=baz")]);
|
||||
|
||||
@@ -91,7 +91,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, PasswordHack> {
|
||||
// the user neither enabled SNI nor resorted to any other method
|
||||
// for passing the project name we rely on. We should show them
|
||||
// the most helpful error message and point to the documentation.
|
||||
.ok_or(AuthErrorImpl::MissingProjectName)?;
|
||||
.ok_or(AuthErrorImpl::MissingEndpointName)?;
|
||||
|
||||
Ok(payload)
|
||||
}
|
||||
|
||||
@@ -6,27 +6,55 @@
|
||||
use bstr::ByteSlice;
|
||||
|
||||
pub struct PasswordHackPayload {
|
||||
pub project: String,
|
||||
pub endpoint: String,
|
||||
pub password: Vec<u8>,
|
||||
}
|
||||
|
||||
impl PasswordHackPayload {
|
||||
pub fn parse(bytes: &[u8]) -> Option<Self> {
|
||||
// The format is `project=<utf-8>;<password-bytes>`.
|
||||
let mut iter = bytes.strip_prefix(b"project=")?.splitn_str(2, ";");
|
||||
let project = iter.next()?.to_str().ok()?.to_owned();
|
||||
let mut iter = bytes.splitn_str(2, ";");
|
||||
let endpoint = iter.next()?.to_str().ok()?;
|
||||
let endpoint = parse_endpoint_param(endpoint)?.to_owned();
|
||||
let password = iter.next()?.to_owned();
|
||||
|
||||
Some(Self { project, password })
|
||||
Some(Self { endpoint, password })
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_endpoint_param(bytes: &str) -> Option<&str> {
|
||||
bytes
|
||||
.strip_prefix("project=")
|
||||
.or_else(|| bytes.strip_prefix("endpoint="))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_password_hack_payload() {
|
||||
fn parse_endpoint_param_fn() {
|
||||
let input = "";
|
||||
assert!(parse_endpoint_param(input).is_none());
|
||||
|
||||
let input = "project=";
|
||||
assert_eq!(parse_endpoint_param(input), Some(""));
|
||||
|
||||
let input = "project=foobar";
|
||||
assert_eq!(parse_endpoint_param(input), Some("foobar"));
|
||||
|
||||
let input = "endpoint=";
|
||||
assert_eq!(parse_endpoint_param(input), Some(""));
|
||||
|
||||
let input = "endpoint=foobar";
|
||||
assert_eq!(parse_endpoint_param(input), Some("foobar"));
|
||||
|
||||
let input = "other_option=foobar";
|
||||
assert!(parse_endpoint_param(input).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_password_hack_payload_project() {
|
||||
let bytes = b"";
|
||||
assert!(PasswordHackPayload::parse(bytes).is_none());
|
||||
|
||||
@@ -34,13 +62,33 @@ mod tests {
|
||||
assert!(PasswordHackPayload::parse(bytes).is_none());
|
||||
|
||||
let bytes = b"project=;";
|
||||
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.project, "");
|
||||
let payload: PasswordHackPayload =
|
||||
PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.endpoint, "");
|
||||
assert_eq!(payload.password, b"");
|
||||
|
||||
let bytes = b"project=foobar;pass;word";
|
||||
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.project, "foobar");
|
||||
assert_eq!(payload.endpoint, "foobar");
|
||||
assert_eq!(payload.password, b"pass;word");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_password_hack_payload_endpoint() {
|
||||
let bytes = b"";
|
||||
assert!(PasswordHackPayload::parse(bytes).is_none());
|
||||
|
||||
let bytes = b"endpoint=";
|
||||
assert!(PasswordHackPayload::parse(bytes).is_none());
|
||||
|
||||
let bytes = b"endpoint=;";
|
||||
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.endpoint, "");
|
||||
assert_eq!(payload.password, b"");
|
||||
|
||||
let bytes = b"endpoint=foobar;pass;word";
|
||||
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.endpoint, "foobar");
|
||||
assert_eq!(payload.password, b"pass;word");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{cancellation::CancelClosure, error::UserFacingError};
|
||||
use crate::{auth::parse_endpoint_param, cancellation::CancelClosure, error::UserFacingError};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
@@ -279,7 +279,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
|
||||
#[allow(unstable_name_collisions)]
|
||||
let options: String = params
|
||||
.options_raw()?
|
||||
.filter(|opt| !opt.starts_with("project="))
|
||||
.filter(|opt| parse_endpoint_param(opt).is_none())
|
||||
.intersperse(" ") // TODO: use impl from std once it's stabilized
|
||||
.collect();
|
||||
|
||||
|
||||
@@ -1,76 +0,0 @@
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
@pytest.mark.timeout(10000)
|
||||
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
"""
|
||||
Test that GC is able to collect all old layers even if them are forming
|
||||
"stairs" and there are not three delta layers since last image layer.
|
||||
|
||||
Information about image layers needed to collect old layers should
|
||||
be propagated by GC to compaction task which should take in in account
|
||||
when make a decision which new image layers needs to be created.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# disable default GC and compaction
|
||||
"gc_period": "1000 m",
|
||||
"compaction_period": "0 s",
|
||||
"gc_horizon": f"{1024 ** 2}",
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "10 s",
|
||||
# "compaction_threshold": "3",
|
||||
# "image_creation_threshold": "2",
|
||||
}
|
||||
)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
|
||||
n_steps = 10
|
||||
n_update_iters = 100
|
||||
step_size = 10000
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SET statement_timeout='1000s'")
|
||||
cur.execute(
|
||||
"CREATE TABLE t(step bigint, count bigint default 0, payload text default repeat(' ', 100)) with (fillfactor=50)"
|
||||
)
|
||||
cur.execute("CREATE INDEX ON t(step)")
|
||||
# In each step, we insert 'step_size' new rows, and update the newly inserted rows
|
||||
# 'n_update_iters' times. This creates a lot of churn and generates lots of WAL at the end of the table,
|
||||
# without modifying the earlier parts of the table.
|
||||
for step in range(n_steps):
|
||||
cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})")
|
||||
for i in range(n_update_iters):
|
||||
cur.execute(f"UPDATE t set count=count+1 where step = {step}")
|
||||
cur.execute("vacuum t")
|
||||
|
||||
# cur.execute("select pg_table_size('t')")
|
||||
# logical_size = cur.fetchone()[0]
|
||||
logical_size = client.timeline_detail(tenant_id, timeline_id)["current_logical_size"]
|
||||
log.info(f"Logical storage size {logical_size}")
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# Do compaction and GC
|
||||
client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
# One more iteration to check that no excessive image layers are generated
|
||||
client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
|
||||
log.info(f"Physical storage size {physical_size}")
|
||||
|
||||
MB = 1024 * 1024
|
||||
zenbenchmark.record("logical_size", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
|
||||
zenbenchmark.record("physical_size", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
|
||||
zenbenchmark.record(
|
||||
"physical/logical ratio", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
@@ -5,16 +5,18 @@ import pytest
|
||||
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
|
||||
|
||||
|
||||
def test_proxy_select_1(static_proxy: NeonProxy):
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_proxy_select_1(static_proxy: NeonProxy, option_name: str):
|
||||
"""
|
||||
A simplest smoke test: check proxy against a local postgres instance.
|
||||
"""
|
||||
|
||||
out = static_proxy.safe_psql("select 1", options="project=generic-project-name")
|
||||
out = static_proxy.safe_psql("select 1", options=f"{option_name}=generic-project-name")
|
||||
assert out[0][0] == 1
|
||||
|
||||
|
||||
def test_password_hack(static_proxy: NeonProxy):
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_password_hack(static_proxy: NeonProxy, option_name: str):
|
||||
"""
|
||||
Check the PasswordHack auth flow: an alternative to SCRAM auth for
|
||||
clients which can't provide the project/endpoint name via SNI or `options`.
|
||||
@@ -23,11 +25,12 @@ def test_password_hack(static_proxy: NeonProxy):
|
||||
user = "borat"
|
||||
password = "password"
|
||||
static_proxy.safe_psql(
|
||||
f"create role {user} with login password '{password}'", options="project=irrelevant"
|
||||
f"create role {user} with login password '{password}'",
|
||||
options=f"{option_name}=irrelevant",
|
||||
)
|
||||
|
||||
# Note the format of `magic`!
|
||||
magic = f"project=irrelevant;{password}"
|
||||
magic = f"{option_name}=irrelevant;{password}"
|
||||
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
|
||||
|
||||
# Must also check that invalid magic won't be accepted.
|
||||
@@ -56,55 +59,62 @@ async def test_link_auth(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy):
|
||||
assert out == "42"
|
||||
|
||||
|
||||
def test_proxy_options(static_proxy: NeonProxy):
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_proxy_options(static_proxy: NeonProxy, option_name: str):
|
||||
"""
|
||||
Check that we pass extra `options` to the PostgreSQL server:
|
||||
* `project=...` shouldn't be passed at all (otherwise postgres will raise an error).
|
||||
* `project=...` and `endpoint=...` shouldn't be passed at all
|
||||
* (otherwise postgres will raise an error).
|
||||
* everything else should be passed as-is.
|
||||
"""
|
||||
|
||||
options = "project=irrelevant -cproxytest.option=value"
|
||||
options = f"{option_name}=irrelevant -cproxytest.option=value"
|
||||
out = static_proxy.safe_psql("show proxytest.option", options=options)
|
||||
assert out[0][0] == "value"
|
||||
|
||||
options = "-c proxytest.foo=\\ str project=irrelevant"
|
||||
options = f"-c proxytest.foo=\\ str {option_name}=irrelevant"
|
||||
out = static_proxy.safe_psql("show proxytest.foo", options=options)
|
||||
assert out[0][0] == " str"
|
||||
|
||||
|
||||
def test_auth_errors(static_proxy: NeonProxy):
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_auth_errors(static_proxy: NeonProxy, option_name: str):
|
||||
"""
|
||||
Check that we throw very specific errors in some unsuccessful auth scenarios.
|
||||
"""
|
||||
|
||||
# User does not exist
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", options="project=irrelevant")
|
||||
static_proxy.connect(user="pinocchio", options=f"{option_name}=irrelevant")
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
|
||||
static_proxy.safe_psql(
|
||||
"create role pinocchio with login password 'magic'", options="project=irrelevant"
|
||||
"create role pinocchio with login password 'magic'",
|
||||
options=f"{option_name}=irrelevant",
|
||||
)
|
||||
|
||||
# User exists, but password is missing
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", password=None, options="project=irrelevant")
|
||||
static_proxy.connect(user="pinocchio", password=None, options=f"{option_name}=irrelevant")
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
|
||||
# User exists, but password is wrong
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", password="bad", options="project=irrelevant")
|
||||
static_proxy.connect(user="pinocchio", password="bad", options=f"{option_name}=irrelevant")
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
|
||||
# Finally, check that the user can connect
|
||||
with static_proxy.connect(user="pinocchio", password="magic", options="project=irrelevant"):
|
||||
with static_proxy.connect(
|
||||
user="pinocchio", password="magic", options=f"{option_name}=irrelevant"
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
|
||||
"""
|
||||
Check that we forward all necessary PostgreSQL server params to client.
|
||||
"""
|
||||
@@ -130,7 +140,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
where name = any(%s)
|
||||
"""
|
||||
|
||||
with static_proxy.connect(options="project=irrelevant") as conn:
|
||||
with static_proxy.connect(options=f"{option_name}=irrelevant") as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(query, (reported_params_subset,))
|
||||
for name, value in cur.fetchall():
|
||||
@@ -138,17 +148,18 @@ def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
assert conn.get_parameter_status(name) == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
@pytest.mark.timeout(5)
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy, option_name: str):
|
||||
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
|
||||
# until after connections close.
|
||||
with static_proxy.connect(options="project=irrelevant"), static_proxy.connect(
|
||||
options="project=irrelevant"
|
||||
with static_proxy.connect(options=f"{option_name}=irrelevant"), static_proxy.connect(
|
||||
options=f"{option_name}=irrelevant"
|
||||
):
|
||||
static_proxy.terminate()
|
||||
with pytest.raises(subprocess.TimeoutExpired):
|
||||
static_proxy.wait_for_exit(timeout=2)
|
||||
# Ensure we don't accept any more connections
|
||||
with pytest.raises(psycopg2.OperationalError):
|
||||
static_proxy.connect(options="project=irrelevant")
|
||||
static_proxy.connect(options=f"{option_name}=irrelevant")
|
||||
static_proxy.wait_for_exit()
|
||||
|
||||
@@ -217,6 +217,16 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()])
|
||||
log.info(f"{sample.name}{{{labels}}} {sample.value}")
|
||||
|
||||
# Test that we gather tenant create metric
|
||||
storage_operation_metrics = [
|
||||
"pageserver_storage_operations_seconds_global_bucket",
|
||||
"pageserver_storage_operations_seconds_global_sum",
|
||||
"pageserver_storage_operations_seconds_global_count",
|
||||
]
|
||||
for metric in storage_operation_metrics:
|
||||
value = ps_metrics.query_all(metric, filter={"operation": "create tenant"})
|
||||
assert value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind",
|
||||
|
||||
@@ -324,11 +324,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
If we're stuck uploading the index file with the is_delete flag,
|
||||
eventually console will hand up and retry.
|
||||
If we're still stuck at the retry time, ensure that the retry
|
||||
fails with status 500, signalling to console that it should retry
|
||||
later.
|
||||
Ideally, timeline_delete should return 202 Accepted and require
|
||||
console to poll for completion, but, that would require changing
|
||||
the API contract.
|
||||
eventually completes with the same status.
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
@@ -342,24 +338,34 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# make the first call sleep practically forever
|
||||
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
|
||||
def first_call(result_queue):
|
||||
def delete_timeline_call(name, result_queue, barrier):
|
||||
if barrier:
|
||||
barrier.wait()
|
||||
try:
|
||||
log.info("first call start")
|
||||
log.info(f"{name} call start")
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
|
||||
log.info("first call success")
|
||||
log.info(f"{name} call success")
|
||||
result_queue.put("success")
|
||||
except Exception:
|
||||
log.exception("first call failed")
|
||||
log.exception(f"{name} call failed")
|
||||
result_queue.put("failure, see log for stack trace")
|
||||
|
||||
first_call_result: queue.Queue[str] = queue.Queue()
|
||||
first_call_thread = threading.Thread(target=first_call, args=(first_call_result,))
|
||||
delete_results: queue.Queue[str] = queue.Queue()
|
||||
first_call_thread = threading.Thread(
|
||||
target=delete_timeline_call,
|
||||
args=(
|
||||
"1st",
|
||||
delete_results,
|
||||
None,
|
||||
),
|
||||
)
|
||||
first_call_thread.start()
|
||||
|
||||
second_call_thread = None
|
||||
|
||||
try:
|
||||
|
||||
def first_call_hit_failpoint():
|
||||
@@ -369,38 +375,53 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
|
||||
wait_until(50, 0.1, first_call_hit_failpoint)
|
||||
|
||||
# make the second call and assert behavior
|
||||
log.info("second call start")
|
||||
error_msg_re = "another task is already setting the deleted_flag, started at"
|
||||
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
|
||||
assert second_call_err.value.status_code == 500
|
||||
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
|
||||
# the second call will try to transition the timeline into Stopping state as well
|
||||
barrier = threading.Barrier(2)
|
||||
second_call_thread = threading.Thread(
|
||||
target=delete_timeline_call,
|
||||
args=(
|
||||
"2nd",
|
||||
delete_results,
|
||||
barrier,
|
||||
),
|
||||
)
|
||||
second_call_thread.start()
|
||||
|
||||
barrier.wait()
|
||||
|
||||
# release the pause
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
# both should had succeeded: the second call will coalesce with the already-ongoing first call
|
||||
result = delete_results.get()
|
||||
assert result == "success"
|
||||
result = delete_results.get()
|
||||
assert result == "success"
|
||||
|
||||
# the second call will try to transition the timeline into Stopping state, but it's already in that state
|
||||
# (the transition to Stopping state is not part of the request coalescing, because Tenant and Timeline states are a mess already)
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
log.info("second call failed as expected")
|
||||
|
||||
# by now we know that the second call failed, let's ensure the first call will finish
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
result = first_call_result.get()
|
||||
assert result == "success"
|
||||
def second_call_attempt():
|
||||
assert env.pageserver.log_contains(
|
||||
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
|
||||
wait_until(50, 0.1, second_call_attempt)
|
||||
finally:
|
||||
log.info("joining first call thread")
|
||||
log.info("joining 1st thread")
|
||||
# in any case, make sure the lifetime of the thread is bounded to this test
|
||||
first_call_thread.join()
|
||||
|
||||
if second_call_thread:
|
||||
log.info("joining 2nd thread")
|
||||
second_call_thread.join()
|
||||
|
||||
|
||||
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the client hangs up before we start the index part upload but after we mark it
|
||||
deleted in local memory, a subsequent delete_timeline call should be able to do
|
||||
another delete timeline operation.
|
||||
|
||||
This tests cancel safety up to the given failpoint.
|
||||
Make sure the timeline_delete runs to completion even if first request is cancelled because of a timeout.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
@@ -437,12 +458,12 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
wait_until(50, 0.1, got_hangup_log_message)
|
||||
|
||||
# ok, retry without failpoint, it should succeed
|
||||
# after disabling the failpoint pause, the original attempt should complete eventually
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
# this should succeed
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
# the second call will try to transition the timeline into Stopping state, but it's already in that state
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
def delete_timeline_completes():
|
||||
assert [env.initial_timeline] == [
|
||||
timeline_id for (_, timeline_id) in env.neon_cli.list_timelines()
|
||||
]
|
||||
|
||||
wait_until(50, 0.5, delete_timeline_completes)
|
||||
|
||||
Reference in New Issue
Block a user