Compare commits

..

1 Commits

Author SHA1 Message Date
Erik Grinaker
4308ffe5c0 safekeeper: batch AppendRequest writes 2024-11-13 15:09:57 +01:00
18 changed files with 205 additions and 211 deletions

4
Cargo.lock generated
View File

@@ -994,9 +994,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.5.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
dependencies = [
"serde",
]

View File

@@ -1,45 +1,3 @@
commit 00aa659afc9c7336ab81036edec3017168aabf40
Author: Heikki Linnakangas <heikki@neon.tech>
Date: Tue Nov 12 16:59:19 2024 +0200
Temporarily disable test that depends on timezone
diff --git a/tests/expected/generalization.out b/tests/expected/generalization.out
index 23ef5fa..9e60deb 100644
--- a/ext-src/pg_anon-src/tests/expected/generalization.out
+++ b/ext-src/pg_anon-src/tests/expected/generalization.out
@@ -284,12 +284,9 @@ SELECT anon.generalize_tstzrange('19041107','century');
["Tue Jan 01 00:00:00 1901 PST","Mon Jan 01 00:00:00 2001 PST")
(1 row)
-SELECT anon.generalize_tstzrange('19041107','millennium');
- generalize_tstzrange
------------------------------------------------------------------
- ["Thu Jan 01 00:00:00 1001 PST","Mon Jan 01 00:00:00 2001 PST")
-(1 row)
-
+-- temporarily disabled, see:
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
+--SELECT anon.generalize_tstzrange('19041107','millennium');
-- generalize_daterange
SELECT anon.generalize_daterange('19041107');
generalize_daterange
diff --git a/tests/sql/generalization.sql b/tests/sql/generalization.sql
index b868344..b4fc977 100644
--- a/ext-src/pg_anon-src/tests/sql/generalization.sql
+++ b/ext-src/pg_anon-src/tests/sql/generalization.sql
@@ -61,7 +61,9 @@ SELECT anon.generalize_tstzrange('19041107','month');
SELECT anon.generalize_tstzrange('19041107','year');
SELECT anon.generalize_tstzrange('19041107','decade');
SELECT anon.generalize_tstzrange('19041107','century');
-SELECT anon.generalize_tstzrange('19041107','millennium');
+-- temporarily disabled, see:
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
+--SELECT anon.generalize_tstzrange('19041107','millennium');
-- generalize_daterange
SELECT anon.generalize_daterange('19041107');
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
Author: Alexey Masterov <alexeymasterov@neon.tech>
Date: Fri May 31 06:34:26 2024 +0000

View File

@@ -15,9 +15,6 @@ pub enum DownloadError {
///
/// Concurrency control is not timed within timeout.
Timeout,
/// Some integrity/consistency check failed during download. This is used during
/// timeline loads to cancel the load of a tenant if some timeline detects fatal corruption.
Fatal(String),
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
@@ -32,7 +29,6 @@ impl std::fmt::Display for DownloadError {
DownloadError::Unmodified => write!(f, "File was not modified"),
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
DownloadError::Timeout => write!(f, "timeout"),
DownloadError::Fatal(why) => write!(f, "Fatal read error: {why}"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
}
}
@@ -45,7 +41,7 @@ impl DownloadError {
pub fn is_permanent(&self) -> bool {
use DownloadError::*;
match self {
BadInput(_) | NotFound | Unmodified | Fatal(_) | Cancelled => true,
BadInput(_) | NotFound | Unmodified | Cancelled => true,
Timeout | Other(_) => false,
}
}

View File

@@ -1,15 +1,11 @@
use std::fmt::{Debug, Display};
use futures::Future;
use rand::Rng;
use tokio_util::sync::CancellationToken;
pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
pub const DEFAULT_NETWORK_BASE_BACKOFF_SECONDS: f64 = 1.5;
pub const DEFAULT_NETWORK_MAX_BACKOFF_SECONDS: f64 = 60.0;
pub async fn exponential_backoff(
n: u32,
base_increment: f64,
@@ -36,41 +32,11 @@ pub async fn exponential_backoff(
pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0
} else if base_increment == 0.0 {
1.0
} else {
((1.0 + base_increment).powf(f64::from(n))
+ rand::thread_rng().gen_range(0.0..base_increment))
.min(max_seconds)
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
}
}
pub async fn retry<T, O, F, E>(
op: O,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
cancel: &CancellationToken,
) -> Option<Result<T, E>>
where
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{
retry_with_options(
op,
is_permanent,
warn_threshold,
max_retries,
description,
cancel,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
)
.await
}
/// Retries passed operation until one of the following conditions are met:
/// - encountered error is considered as permanent (non-retryable)
/// - retries have been exhausted
@@ -85,16 +51,13 @@ where
/// for any other error type. Final failed attempt is logged with `{:?}`.
///
/// Returns `None` if cancellation was noticed during backoff or the terminal result.
#[allow(clippy::too_many_arguments)]
pub async fn retry_with_options<T, O, F, E>(
pub async fn retry<T, O, F, E>(
mut op: O,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
cancel: &CancellationToken,
base_increment: f64,
max_seconds: f64,
) -> Option<Result<T, E>>
where
// Not std::error::Error because anyhow::Error doesnt implement it.
@@ -139,7 +102,13 @@ where
}
}
// sleep and retry
exponential_backoff(attempts, base_increment, max_seconds, cancel).await;
exponential_backoff(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
attempts += 1;
}
}
@@ -163,8 +132,7 @@ mod tests {
if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
assert!(
// accommodate the randomness of the backoff
old_backoff_value - DEFAULT_BASE_BACKOFF_SECONDS <= new_backoff_value,
old_backoff_value <= new_backoff_value,
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
)
}

View File

@@ -1433,12 +1433,6 @@ impl Tenant {
info!(%timeline_id, "index_part not found on remote");
continue;
}
Err(DownloadError::Fatal(why)) => {
// If, while loading one remote timeline, we saw an indication that our generation
// number is likely invalid, then we should not load the whole tenant.
error!(%timeline_id, "Fatal error loading timeline: {why}");
anyhow::bail!(why.to_string());
}
Err(e) => {
// Some (possibly ephemeral) error happened during index_part download.
// Pretend the timeline exists to not delete the timeline directory,

View File

@@ -574,18 +574,12 @@ impl RemoteTimelineClient {
if latest_index_generation > index_generation {
// Unexpected! Why are we loading such an old index if a more recent one exists?
// We will refuse to proceed, as there is no reasonable scenario where this should happen, but
// there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
// backwards).
tracing::error!(
tracing::warn!(
?index_generation,
?latest_index_generation,
?latest_index_mtime,
"Found a newer index while loading an old one"
);
return Err(DownloadError::Fatal(
"Index age exceeds threshold and a newer index exists".into(),
));
}
}

View File

@@ -693,15 +693,13 @@ where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
backoff::retry_with_options(
backoff::retry(
op,
DownloadError::is_permanent,
FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
description,
cancel,
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
)
.await
.ok_or_else(|| DownloadError::Cancelled)
@@ -717,15 +715,13 @@ where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
backoff::retry_with_options(
backoff::retry(
op,
DownloadError::is_permanent,
FAILED_DOWNLOAD_WARN_THRESHOLD,
u32::MAX,
description,
cancel,
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
)
.await
.ok_or_else(|| DownloadError::Cancelled)

View File

@@ -8,7 +8,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use tracing::Instrument;
use utils::backoff;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::sync::{gate, heavier_once_cell};
@@ -1205,8 +1204,8 @@ impl LayerInner {
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);

View File

@@ -262,7 +262,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
// Send requests.
for req in reqgen {
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
while reply_rx.try_recv().is_ok() {} // discard replies, to avoid blocking
let msg = ProposerAcceptorMessage::AppendRequest(req);
msg_tx.send(msg).await.expect("send failed");
}

View File

@@ -217,7 +217,8 @@ pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
buckets.insert(0, 0.0);
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
assert!(buckets.len() <= 12, "too many histogram buckets");
// TODO: tweak this.
assert!(buckets.len() <= 16, "too many histogram buckets");
register_histogram!(
"safekeeper_wal_receiver_queue_depth",

View File

@@ -7,14 +7,15 @@ use crate::metrics::{
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
};
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
ServerInfo,
};
use crate::timeline::WalResidentTimeline;
use crate::wal_service::ConnectionId;
use crate::GlobalTimelines;
use anyhow::{anyhow, Context};
use bytes::BytesMut;
use bytes::{BufMut as _, Bytes, BytesMut};
use parking_lot::MappedMutexGuard;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
@@ -206,7 +207,8 @@ impl Drop for WalReceiverGuard {
}
}
pub const MSG_QUEUE_SIZE: usize = 256;
// TODO: reconsider this.
pub const MSG_QUEUE_SIZE: usize = 4096;
pub const REPLY_QUEUE_SIZE: usize = 16;
impl SafekeeperPostgresHandler {
@@ -484,6 +486,9 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
/// The AppendRequest buffer size.
const APPEND_BUFFER_SIZE: usize = 1024 * 1024;
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx.
///
@@ -530,6 +535,9 @@ impl WalAcceptor {
async fn run(&mut self) -> anyhow::Result<()> {
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
// Buffer AppendRequests to submit them as a single large write.
let mut append_buf = BufferedAppendRequest::new(APPEND_BUFFER_SIZE);
// Periodically flush the WAL and compute metrics.
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
@@ -546,7 +554,7 @@ impl WalAcceptor {
// Process inbound message.
msg = self.msg_rx.recv() => {
// If disconnected, break to flush WAL and return.
let Some(mut msg) = msg else {
let Some(msg) = msg else {
break;
};
@@ -563,11 +571,44 @@ impl WalAcceptor {
// This batches multiple appends per fsync. If the channel is empty after
// sending the reply, we'll schedule an immediate flush.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;
}
// Try to batch multiple messages into a single large write.
if !append_buf.is_empty() || !self.msg_rx.is_empty() {
if append_buf.add(&append_request) {
continue; // message buffered, go get next message
}
self.tli.process_msg(&msg).await?
// Full buffer, write it and buffer this message for next iteration.
dirty = true;
let buf_req = append_buf.take().expect("empty buffer");
let buf_msg = ProposerAcceptorMessage::NoFlushAppendRequest(buf_req);
let reply = self.tli.process_msg(&buf_msg).await?;
drop(buf_msg); // allow reusing buffer for add
assert!(append_buf.add(&append_request), "empty buffer rejected msg");
reply
} else {
dirty = true;
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
self.tli.process_msg(&msg).await?
}
} else {
self.tli.process_msg(&msg).await?
}
}
// If there are no pending messages, write the append buffer.
//
// NB: we don't also flush the WAL here. Otherwise we can get into a regime where we
// quickly drain msg_rx and fsync before the sender is able to repopulate msg_rx.
// This happens consistently due to Tokio scheduling, leading to overeager fsyncing.
// Instead, we perform the write without fsyncing and give the sender a chance to
// get scheduled and populate msg_rx for the next iteration. If there are no further
// messages, the next iteration will flush the WAL.
_ = future::ready(()), if self.msg_rx.is_empty() && !append_buf.is_empty() => {
dirty = true;
let buf_req = append_buf.take().expect("empty buffer");
self.tli
.process_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(buf_req))
.await?
}
// While receiving AppendRequests, flush the WAL periodically and respond with an
@@ -579,11 +620,11 @@ impl WalAcceptor {
.await?
}
// If there are no pending messages, flush the WAL immediately.
// If there are no pending messages, flush the WAL and append buffer immediately.
//
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
_ = future::ready(()), if self.msg_rx.is_empty() && dirty => {
dirty = false;
flush_ticker.reset();
self.tli
@@ -627,3 +668,115 @@ impl Drop for WalAcceptor {
}
}
}
/// Buffers WAL data for multiple AppendRequests, to submit them as a single write.
struct BufferedAppendRequest {
/// The buffer capacity.
capacity: usize,
/// The buffered header and WAL data.
buf: Option<(AppendRequestHeader, BytesMut)>,
/// A previous buffer that can be reused when the returned message is dropped.
reuse_buf: Option<Bytes>,
/// If an AppendRequest is larger than the buffer capacity (when empty), just stash it here to
/// avoid growing the buffer and copying it. This will be returned as-is.
large: Option<AppendRequest>,
}
impl BufferedAppendRequest {
/// Creates a new append request buffer with the given capacity.
fn new(capacity: usize) -> Self {
Self {
capacity,
buf: None,
reuse_buf: None,
large: None,
}
}
/// Adds the given append request to the buffer, if possible. Returns `false` if the message
/// can't be buffered, leaving self unmodified. An empty buffer will always accept a message.
///
/// If the buffer is not empty, the message must have the same term and proposer and contiguous
/// `begin_lsn` and `end_lsn`. The buffer must have available capacity for the entire
/// `wal_data`. If the message is greater than an empty buffer's capacity, it is accepted but
/// simply stashed away in `large` without growing the buffer.
pub fn add(&mut self, msg: &AppendRequest) -> bool {
// If there is a stashed large message, reject further messages.
if self.large.is_some() {
return false;
}
// If there is no existing buffer, initialize one with the message.
let Some((ref mut h, ref mut wal_data)) = self.buf else {
// If the message is larger than the buffer capacity, just stash it instead of growing.
if msg.wal_data.len() > self.capacity {
assert!(self.large.is_none());
self.large = Some(msg.clone()); // clone is cheap with Bytes
return true;
}
// Reuse a previous buffer, if any, or allocate a new one.
//
// TODO: try_into_mut() is essentially runtime borrow checking. If AppendRequest used a
// normal Vec<u8> we could do compile-time borrow checking instead and avoid panic.
let mut wal_data = match self.reuse_buf.take() {
Some(reuse_buf) => match reuse_buf.try_into_mut() {
Ok(mut reuse_buf) => {
assert_eq!(reuse_buf.capacity(), self.capacity);
reuse_buf.clear();
reuse_buf
}
Err(_) => panic!("couldn't reuse buffer, still in use"),
},
None => BytesMut::with_capacity(self.capacity),
};
// Copy the append request into the buffer.
wal_data.put_slice(&msg.wal_data);
self.buf = Some((msg.h, wal_data));
return true;
};
// The messages must have the same term and proposer.
if h.term != msg.h.term || h.proposer_uuid != msg.h.proposer_uuid {
return false;
}
// The messages must be contiguous.
if h.end_lsn != msg.h.begin_lsn {
return false;
}
// The message must fit in the buffer.
if wal_data.len() + msg.wal_data.len() > self.capacity {
return false;
}
// Add the message to the buffer, bumping the commit and truncate LSNs. We assume that later
// messages have later commit/truncate LSNs.
h.end_lsn = msg.h.end_lsn;
h.commit_lsn = msg.h.commit_lsn;
h.truncate_lsn = msg.h.truncate_lsn;
wal_data.put_slice(&msg.wal_data);
true
}
/// Returns true if there is no buffered message.
fn is_empty(&self) -> bool {
self.buf.is_none() && self.large.is_none()
}
/// Takes the buffered AppendRequest (if any), leaving a None in its place.
///
/// NB: The returned `wal_data` Bytes must be dropped before the next call to `add()`, in order
/// to reuse the buffer. This is basically runtime borrow checking, because of Bytes.
fn take(&mut self) -> Option<AppendRequest> {
// If there is a stashed large message, return it.
if let Some(large) = self.large.take() {
assert!(self.buf.is_none(), "both buf and large are set");
return Some(large);
}
let (h, wal_data) = self.buf.take()?;
let wal_data = wal_data.freeze();
self.reuse_buf = Some(wal_data.clone()); // keep a reference to the buffer
Some(AppendRequest { h, wal_data })
}
}

View File

@@ -296,12 +296,13 @@ pub struct ProposerElected {
/// Request with WAL message sent from proposer to safekeeper. Along the way it
/// communicates commit_lsn.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct AppendRequest {
pub h: AppendRequestHeader,
pub wal_data: Bytes,
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Copy, Deserialize)]
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
@@ -1166,7 +1167,7 @@ mod tests {
proposer_uuid: [0; 16],
};
let mut append_request = AppendRequest {
h: ar_hdr.clone(),
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
@@ -1240,7 +1241,7 @@ mod tests {
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr.clone(),
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
@@ -1248,7 +1249,7 @@ mod tests {
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
let mut ar_hrd2 = ar_hdr.clone();
let mut ar_hrd2 = ar_hdr;
ar_hrd2.begin_lsn = Lsn(4);
ar_hrd2.end_lsn = Lsn(5);
let append_request = AppendRequest {

View File

@@ -35,10 +35,9 @@ from fixtures.pageserver.utils import (
wait_for_upload,
)
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
)
from fixtures.utils import run_only_on_default_postgres, wait_until
from fixtures.utils import wait_until
from fixtures.workload import Workload
if TYPE_CHECKING:
@@ -729,68 +728,3 @@ def test_upgrade_generationless_local_file_paths(
)
# We should download into the same local path we started with
assert os.path.exists(victim_path)
@run_only_on_default_postgres("Only tests index logic")
def test_old_index_time_threshold(
neon_env_builder: NeonEnvBuilder,
):
"""
Exercise pageserver's detection of trying to load an ancient non-latest index.
(see https://github.com/neondatabase/neon/issues/6951)
"""
# Run with local_fs because we will interfere with mtimes by local filesystem access
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(32)
# Remember generation 1's index path
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
index_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id)
# Increment generation by detaching+attaching, and write+flush some data to get a new remote index
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}})
env.storage_controller.reconcile_until_idle()
workload.churn_rows(32)
# A new index should have been written
assert env.pageserver_remote_storage.index_path(tenant_id, timeline_id) != index_path
# Hack the mtime on the generation 1 index
log.info(f"Setting old mtime on {index_path}")
os.utime(index_path, times=(time.time(), time.time() - 30 * 24 * 3600))
env.pageserver.allowed_errors.extend(
[
".*Found a newer index while loading an old one.*",
".*Index age exceeds threshold and a newer index exists.*",
]
)
# Detach from storage controller + attach in an old generation directly on the pageserver.
workload.stop()
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
env.storage_controller.reconcile_until_idle()
env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Stop"})
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy")
# The controller would not do this (attach in an old generation): we are doing it to simulate
# a hypothetical profound bug in the controller.
env.pageserver.http_client().tenant_location_conf(
tenant_id, {"generation": 1, "mode": "AttachedSingle", "tenant_conf": {}}
)
# The pageserver should react to this situation by refusing to attach the tenant and putting
# it into Broken state
env.pageserver.allowed_errors.append(".*tenant is broken.*")
with pytest.raises(
PageserverApiException,
match="tenant is broken: Index age exceeds threshold and a newer index exists",
):
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)

16
vendor/revisions.json vendored
View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.1",
"aa2e29f2b6952140dfe51876bbd11054acae776f"
"17.0",
"ae4cc30dba24f3910533e5a48e8103c3f2fff300"
],
"v16": [
"16.5",
"b0b693ea298454e95e6b154780d1fd586a244dfd"
"16.4",
"03b43900edc5d8d6eecec460bfc89aec7174bd84"
],
"v15": [
"15.9",
"1feff6b60f07cb71b665d0f5ead71a4320a71743"
"15.8",
"fd631a959049dfe2b82f67409c8b8b0d3e0016d1"
],
"v14": [
"14.14",
"c5e0d642efb02e4bfedc283b0a7707fe6c79cc89"
"14.13",
"de0a000dafc2e66ce2e39282d3aa1c704fe0390e"
]
}