mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 02:30:37 +00:00
Compare commits
1 Commits
skyzh/back
...
erik/batch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4308ffe5c0 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -994,9 +994,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.5.0"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
|
||||
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -1,45 +1,3 @@
|
||||
commit 00aa659afc9c7336ab81036edec3017168aabf40
|
||||
Author: Heikki Linnakangas <heikki@neon.tech>
|
||||
Date: Tue Nov 12 16:59:19 2024 +0200
|
||||
|
||||
Temporarily disable test that depends on timezone
|
||||
|
||||
diff --git a/tests/expected/generalization.out b/tests/expected/generalization.out
|
||||
index 23ef5fa..9e60deb 100644
|
||||
--- a/ext-src/pg_anon-src/tests/expected/generalization.out
|
||||
+++ b/ext-src/pg_anon-src/tests/expected/generalization.out
|
||||
@@ -284,12 +284,9 @@ SELECT anon.generalize_tstzrange('19041107','century');
|
||||
["Tue Jan 01 00:00:00 1901 PST","Mon Jan 01 00:00:00 2001 PST")
|
||||
(1 row)
|
||||
|
||||
-SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
- generalize_tstzrange
|
||||
------------------------------------------------------------------
|
||||
- ["Thu Jan 01 00:00:00 1001 PST","Mon Jan 01 00:00:00 2001 PST")
|
||||
-(1 row)
|
||||
-
|
||||
+-- temporarily disabled, see:
|
||||
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
|
||||
+--SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
-- generalize_daterange
|
||||
SELECT anon.generalize_daterange('19041107');
|
||||
generalize_daterange
|
||||
diff --git a/tests/sql/generalization.sql b/tests/sql/generalization.sql
|
||||
index b868344..b4fc977 100644
|
||||
--- a/ext-src/pg_anon-src/tests/sql/generalization.sql
|
||||
+++ b/ext-src/pg_anon-src/tests/sql/generalization.sql
|
||||
@@ -61,7 +61,9 @@ SELECT anon.generalize_tstzrange('19041107','month');
|
||||
SELECT anon.generalize_tstzrange('19041107','year');
|
||||
SELECT anon.generalize_tstzrange('19041107','decade');
|
||||
SELECT anon.generalize_tstzrange('19041107','century');
|
||||
-SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
+-- temporarily disabled, see:
|
||||
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
|
||||
+--SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
|
||||
-- generalize_daterange
|
||||
SELECT anon.generalize_daterange('19041107');
|
||||
|
||||
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
|
||||
Author: Alexey Masterov <alexeymasterov@neon.tech>
|
||||
Date: Fri May 31 06:34:26 2024 +0000
|
||||
|
||||
@@ -15,9 +15,6 @@ pub enum DownloadError {
|
||||
///
|
||||
/// Concurrency control is not timed within timeout.
|
||||
Timeout,
|
||||
/// Some integrity/consistency check failed during download. This is used during
|
||||
/// timeline loads to cancel the load of a tenant if some timeline detects fatal corruption.
|
||||
Fatal(String),
|
||||
/// The file was found in the remote storage, but the download failed.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -32,7 +29,6 @@ impl std::fmt::Display for DownloadError {
|
||||
DownloadError::Unmodified => write!(f, "File was not modified"),
|
||||
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
|
||||
DownloadError::Timeout => write!(f, "timeout"),
|
||||
DownloadError::Fatal(why) => write!(f, "Fatal read error: {why}"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||
}
|
||||
}
|
||||
@@ -45,7 +41,7 @@ impl DownloadError {
|
||||
pub fn is_permanent(&self) -> bool {
|
||||
use DownloadError::*;
|
||||
match self {
|
||||
BadInput(_) | NotFound | Unmodified | Fatal(_) | Cancelled => true,
|
||||
BadInput(_) | NotFound | Unmodified | Cancelled => true,
|
||||
Timeout | Other(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,11 @@
|
||||
use std::fmt::{Debug, Display};
|
||||
|
||||
use futures::Future;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
|
||||
pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
|
||||
|
||||
pub const DEFAULT_NETWORK_BASE_BACKOFF_SECONDS: f64 = 1.5;
|
||||
pub const DEFAULT_NETWORK_MAX_BACKOFF_SECONDS: f64 = 60.0;
|
||||
|
||||
pub async fn exponential_backoff(
|
||||
n: u32,
|
||||
base_increment: f64,
|
||||
@@ -36,41 +32,11 @@ pub async fn exponential_backoff(
|
||||
pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
|
||||
if n == 0 {
|
||||
0.0
|
||||
} else if base_increment == 0.0 {
|
||||
1.0
|
||||
} else {
|
||||
((1.0 + base_increment).powf(f64::from(n))
|
||||
+ rand::thread_rng().gen_range(0.0..base_increment))
|
||||
.min(max_seconds)
|
||||
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn retry<T, O, F, E>(
|
||||
op: O,
|
||||
is_permanent: impl Fn(&E) -> bool,
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
description: &str,
|
||||
cancel: &CancellationToken,
|
||||
) -> Option<Result<T, E>>
|
||||
where
|
||||
E: Display + Debug + 'static,
|
||||
O: FnMut() -> F,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
{
|
||||
retry_with_options(
|
||||
op,
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
description,
|
||||
cancel,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Retries passed operation until one of the following conditions are met:
|
||||
/// - encountered error is considered as permanent (non-retryable)
|
||||
/// - retries have been exhausted
|
||||
@@ -85,16 +51,13 @@ where
|
||||
/// for any other error type. Final failed attempt is logged with `{:?}`.
|
||||
///
|
||||
/// Returns `None` if cancellation was noticed during backoff or the terminal result.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn retry_with_options<T, O, F, E>(
|
||||
pub async fn retry<T, O, F, E>(
|
||||
mut op: O,
|
||||
is_permanent: impl Fn(&E) -> bool,
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
description: &str,
|
||||
cancel: &CancellationToken,
|
||||
base_increment: f64,
|
||||
max_seconds: f64,
|
||||
) -> Option<Result<T, E>>
|
||||
where
|
||||
// Not std::error::Error because anyhow::Error doesnt implement it.
|
||||
@@ -139,7 +102,13 @@ where
|
||||
}
|
||||
}
|
||||
// sleep and retry
|
||||
exponential_backoff(attempts, base_increment, max_seconds, cancel).await;
|
||||
exponential_backoff(
|
||||
attempts,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
attempts += 1;
|
||||
}
|
||||
}
|
||||
@@ -163,8 +132,7 @@ mod tests {
|
||||
|
||||
if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
|
||||
assert!(
|
||||
// accommodate the randomness of the backoff
|
||||
old_backoff_value - DEFAULT_BASE_BACKOFF_SECONDS <= new_backoff_value,
|
||||
old_backoff_value <= new_backoff_value,
|
||||
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1433,12 +1433,6 @@ impl Tenant {
|
||||
info!(%timeline_id, "index_part not found on remote");
|
||||
continue;
|
||||
}
|
||||
Err(DownloadError::Fatal(why)) => {
|
||||
// If, while loading one remote timeline, we saw an indication that our generation
|
||||
// number is likely invalid, then we should not load the whole tenant.
|
||||
error!(%timeline_id, "Fatal error loading timeline: {why}");
|
||||
anyhow::bail!(why.to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
// Some (possibly ephemeral) error happened during index_part download.
|
||||
// Pretend the timeline exists to not delete the timeline directory,
|
||||
|
||||
@@ -574,18 +574,12 @@ impl RemoteTimelineClient {
|
||||
|
||||
if latest_index_generation > index_generation {
|
||||
// Unexpected! Why are we loading such an old index if a more recent one exists?
|
||||
// We will refuse to proceed, as there is no reasonable scenario where this should happen, but
|
||||
// there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
|
||||
// backwards).
|
||||
tracing::error!(
|
||||
tracing::warn!(
|
||||
?index_generation,
|
||||
?latest_index_generation,
|
||||
?latest_index_mtime,
|
||||
"Found a newer index while loading an old one"
|
||||
);
|
||||
return Err(DownloadError::Fatal(
|
||||
"Index age exceeds threshold and a newer index exists".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -693,15 +693,13 @@ where
|
||||
O: FnMut() -> F,
|
||||
F: Future<Output = Result<T, DownloadError>>,
|
||||
{
|
||||
backoff::retry_with_options(
|
||||
backoff::retry(
|
||||
op,
|
||||
DownloadError::is_permanent,
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
description,
|
||||
cancel,
|
||||
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
|
||||
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| DownloadError::Cancelled)
|
||||
@@ -717,15 +715,13 @@ where
|
||||
O: FnMut() -> F,
|
||||
F: Future<Output = Result<T, DownloadError>>,
|
||||
{
|
||||
backoff::retry_with_options(
|
||||
backoff::retry(
|
||||
op,
|
||||
DownloadError::is_permanent,
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
u32::MAX,
|
||||
description,
|
||||
cancel,
|
||||
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
|
||||
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| DownloadError::Cancelled)
|
||||
|
||||
@@ -8,7 +8,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tracing::Instrument;
|
||||
use utils::backoff;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::{gate, heavier_once_cell};
|
||||
@@ -1205,8 +1204,8 @@ impl LayerInner {
|
||||
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
|
||||
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
@@ -262,7 +262,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
|
||||
// Send requests.
|
||||
for req in reqgen {
|
||||
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
|
||||
while reply_rx.try_recv().is_ok() {} // discard replies, to avoid blocking
|
||||
let msg = ProposerAcceptorMessage::AppendRequest(req);
|
||||
msg_tx.send(msg).await.expect("send failed");
|
||||
}
|
||||
|
||||
@@ -217,7 +217,8 @@ pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
|
||||
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
|
||||
buckets.insert(0, 0.0);
|
||||
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
|
||||
assert!(buckets.len() <= 12, "too many histogram buckets");
|
||||
// TODO: tweak this.
|
||||
assert!(buckets.len() <= 16, "too many histogram buckets");
|
||||
|
||||
register_histogram!(
|
||||
"safekeeper_wal_receiver_queue_depth",
|
||||
|
||||
@@ -7,14 +7,15 @@ use crate::metrics::{
|
||||
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
|
||||
};
|
||||
use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
ServerInfo,
|
||||
};
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{BufMut as _, Bytes, BytesMut};
|
||||
use parking_lot::MappedMutexGuard;
|
||||
use parking_lot::Mutex;
|
||||
use parking_lot::MutexGuard;
|
||||
@@ -206,7 +207,8 @@ impl Drop for WalReceiverGuard {
|
||||
}
|
||||
}
|
||||
|
||||
pub const MSG_QUEUE_SIZE: usize = 256;
|
||||
// TODO: reconsider this.
|
||||
pub const MSG_QUEUE_SIZE: usize = 4096;
|
||||
pub const REPLY_QUEUE_SIZE: usize = 16;
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
@@ -484,6 +486,9 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
|
||||
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
/// The AppendRequest buffer size.
|
||||
const APPEND_BUFFER_SIZE: usize = 1024 * 1024;
|
||||
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx.
|
||||
///
|
||||
@@ -530,6 +535,9 @@ impl WalAcceptor {
|
||||
async fn run(&mut self) -> anyhow::Result<()> {
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
|
||||
// Buffer AppendRequests to submit them as a single large write.
|
||||
let mut append_buf = BufferedAppendRequest::new(APPEND_BUFFER_SIZE);
|
||||
|
||||
// Periodically flush the WAL and compute metrics.
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
@@ -546,7 +554,7 @@ impl WalAcceptor {
|
||||
// Process inbound message.
|
||||
msg = self.msg_rx.recv() => {
|
||||
// If disconnected, break to flush WAL and return.
|
||||
let Some(mut msg) = msg else {
|
||||
let Some(msg) = msg else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -563,11 +571,44 @@ impl WalAcceptor {
|
||||
// This batches multiple appends per fsync. If the channel is empty after
|
||||
// sending the reply, we'll schedule an immediate flush.
|
||||
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
|
||||
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
dirty = true;
|
||||
}
|
||||
// Try to batch multiple messages into a single large write.
|
||||
if !append_buf.is_empty() || !self.msg_rx.is_empty() {
|
||||
if append_buf.add(&append_request) {
|
||||
continue; // message buffered, go get next message
|
||||
}
|
||||
|
||||
self.tli.process_msg(&msg).await?
|
||||
// Full buffer, write it and buffer this message for next iteration.
|
||||
dirty = true;
|
||||
let buf_req = append_buf.take().expect("empty buffer");
|
||||
let buf_msg = ProposerAcceptorMessage::NoFlushAppendRequest(buf_req);
|
||||
let reply = self.tli.process_msg(&buf_msg).await?;
|
||||
drop(buf_msg); // allow reusing buffer for add
|
||||
assert!(append_buf.add(&append_request), "empty buffer rejected msg");
|
||||
reply
|
||||
} else {
|
||||
dirty = true;
|
||||
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
} else {
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
}
|
||||
|
||||
// If there are no pending messages, write the append buffer.
|
||||
//
|
||||
// NB: we don't also flush the WAL here. Otherwise we can get into a regime where we
|
||||
// quickly drain msg_rx and fsync before the sender is able to repopulate msg_rx.
|
||||
// This happens consistently due to Tokio scheduling, leading to overeager fsyncing.
|
||||
// Instead, we perform the write without fsyncing and give the sender a chance to
|
||||
// get scheduled and populate msg_rx for the next iteration. If there are no further
|
||||
// messages, the next iteration will flush the WAL.
|
||||
_ = future::ready(()), if self.msg_rx.is_empty() && !append_buf.is_empty() => {
|
||||
dirty = true;
|
||||
let buf_req = append_buf.take().expect("empty buffer");
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(buf_req))
|
||||
.await?
|
||||
}
|
||||
|
||||
// While receiving AppendRequests, flush the WAL periodically and respond with an
|
||||
@@ -579,11 +620,11 @@ impl WalAcceptor {
|
||||
.await?
|
||||
}
|
||||
|
||||
// If there are no pending messages, flush the WAL immediately.
|
||||
// If there are no pending messages, flush the WAL and append buffer immediately.
|
||||
//
|
||||
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
|
||||
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
|
||||
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
|
||||
_ = future::ready(()), if self.msg_rx.is_empty() && dirty => {
|
||||
dirty = false;
|
||||
flush_ticker.reset();
|
||||
self.tli
|
||||
@@ -627,3 +668,115 @@ impl Drop for WalAcceptor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Buffers WAL data for multiple AppendRequests, to submit them as a single write.
|
||||
struct BufferedAppendRequest {
|
||||
/// The buffer capacity.
|
||||
capacity: usize,
|
||||
/// The buffered header and WAL data.
|
||||
buf: Option<(AppendRequestHeader, BytesMut)>,
|
||||
/// A previous buffer that can be reused when the returned message is dropped.
|
||||
reuse_buf: Option<Bytes>,
|
||||
/// If an AppendRequest is larger than the buffer capacity (when empty), just stash it here to
|
||||
/// avoid growing the buffer and copying it. This will be returned as-is.
|
||||
large: Option<AppendRequest>,
|
||||
}
|
||||
|
||||
impl BufferedAppendRequest {
|
||||
/// Creates a new append request buffer with the given capacity.
|
||||
fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
buf: None,
|
||||
reuse_buf: None,
|
||||
large: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds the given append request to the buffer, if possible. Returns `false` if the message
|
||||
/// can't be buffered, leaving self unmodified. An empty buffer will always accept a message.
|
||||
///
|
||||
/// If the buffer is not empty, the message must have the same term and proposer and contiguous
|
||||
/// `begin_lsn` and `end_lsn`. The buffer must have available capacity for the entire
|
||||
/// `wal_data`. If the message is greater than an empty buffer's capacity, it is accepted but
|
||||
/// simply stashed away in `large` without growing the buffer.
|
||||
pub fn add(&mut self, msg: &AppendRequest) -> bool {
|
||||
// If there is a stashed large message, reject further messages.
|
||||
if self.large.is_some() {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If there is no existing buffer, initialize one with the message.
|
||||
let Some((ref mut h, ref mut wal_data)) = self.buf else {
|
||||
// If the message is larger than the buffer capacity, just stash it instead of growing.
|
||||
if msg.wal_data.len() > self.capacity {
|
||||
assert!(self.large.is_none());
|
||||
self.large = Some(msg.clone()); // clone is cheap with Bytes
|
||||
return true;
|
||||
}
|
||||
|
||||
// Reuse a previous buffer, if any, or allocate a new one.
|
||||
//
|
||||
// TODO: try_into_mut() is essentially runtime borrow checking. If AppendRequest used a
|
||||
// normal Vec<u8> we could do compile-time borrow checking instead and avoid panic.
|
||||
let mut wal_data = match self.reuse_buf.take() {
|
||||
Some(reuse_buf) => match reuse_buf.try_into_mut() {
|
||||
Ok(mut reuse_buf) => {
|
||||
assert_eq!(reuse_buf.capacity(), self.capacity);
|
||||
reuse_buf.clear();
|
||||
reuse_buf
|
||||
}
|
||||
Err(_) => panic!("couldn't reuse buffer, still in use"),
|
||||
},
|
||||
None => BytesMut::with_capacity(self.capacity),
|
||||
};
|
||||
// Copy the append request into the buffer.
|
||||
wal_data.put_slice(&msg.wal_data);
|
||||
self.buf = Some((msg.h, wal_data));
|
||||
return true;
|
||||
};
|
||||
|
||||
// The messages must have the same term and proposer.
|
||||
if h.term != msg.h.term || h.proposer_uuid != msg.h.proposer_uuid {
|
||||
return false;
|
||||
}
|
||||
// The messages must be contiguous.
|
||||
if h.end_lsn != msg.h.begin_lsn {
|
||||
return false;
|
||||
}
|
||||
// The message must fit in the buffer.
|
||||
if wal_data.len() + msg.wal_data.len() > self.capacity {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Add the message to the buffer, bumping the commit and truncate LSNs. We assume that later
|
||||
// messages have later commit/truncate LSNs.
|
||||
h.end_lsn = msg.h.end_lsn;
|
||||
h.commit_lsn = msg.h.commit_lsn;
|
||||
h.truncate_lsn = msg.h.truncate_lsn;
|
||||
wal_data.put_slice(&msg.wal_data);
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns true if there is no buffered message.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.buf.is_none() && self.large.is_none()
|
||||
}
|
||||
|
||||
/// Takes the buffered AppendRequest (if any), leaving a None in its place.
|
||||
///
|
||||
/// NB: The returned `wal_data` Bytes must be dropped before the next call to `add()`, in order
|
||||
/// to reuse the buffer. This is basically runtime borrow checking, because of Bytes.
|
||||
fn take(&mut self) -> Option<AppendRequest> {
|
||||
// If there is a stashed large message, return it.
|
||||
if let Some(large) = self.large.take() {
|
||||
assert!(self.buf.is_none(), "both buf and large are set");
|
||||
return Some(large);
|
||||
}
|
||||
|
||||
let (h, wal_data) = self.buf.take()?;
|
||||
let wal_data = wal_data.freeze();
|
||||
self.reuse_buf = Some(wal_data.clone()); // keep a reference to the buffer
|
||||
Some(AppendRequest { h, wal_data })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,12 +296,13 @@ pub struct ProposerElected {
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
/// communicates commit_lsn.
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AppendRequest {
|
||||
pub h: AppendRequestHeader,
|
||||
pub wal_data: Bytes,
|
||||
}
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize)]
|
||||
pub struct AppendRequestHeader {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
@@ -1166,7 +1167,7 @@ mod tests {
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let mut append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
@@ -1240,7 +1241,7 @@ mod tests {
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
@@ -1248,7 +1249,7 @@ mod tests {
|
||||
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await
|
||||
.unwrap();
|
||||
let mut ar_hrd2 = ar_hdr.clone();
|
||||
let mut ar_hrd2 = ar_hdr;
|
||||
ar_hrd2.begin_lsn = Lsn(4);
|
||||
ar_hrd2.end_lsn = Lsn(5);
|
||||
let append_request = AppendRequest {
|
||||
|
||||
@@ -35,10 +35,9 @@ from fixtures.pageserver.utils import (
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
LocalFsStorage,
|
||||
RemoteStorageKind,
|
||||
)
|
||||
from fixtures.utils import run_only_on_default_postgres, wait_until
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -729,68 +728,3 @@ def test_upgrade_generationless_local_file_paths(
|
||||
)
|
||||
# We should download into the same local path we started with
|
||||
assert os.path.exists(victim_path)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Only tests index logic")
|
||||
def test_old_index_time_threshold(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Exercise pageserver's detection of trying to load an ancient non-latest index.
|
||||
(see https://github.com/neondatabase/neon/issues/6951)
|
||||
"""
|
||||
|
||||
# Run with local_fs because we will interfere with mtimes by local filesystem access
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(32)
|
||||
|
||||
# Remember generation 1's index path
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
index_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id)
|
||||
|
||||
# Increment generation by detaching+attaching, and write+flush some data to get a new remote index
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
workload.churn_rows(32)
|
||||
|
||||
# A new index should have been written
|
||||
assert env.pageserver_remote_storage.index_path(tenant_id, timeline_id) != index_path
|
||||
|
||||
# Hack the mtime on the generation 1 index
|
||||
log.info(f"Setting old mtime on {index_path}")
|
||||
os.utime(index_path, times=(time.time(), time.time() - 30 * 24 * 3600))
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Found a newer index while loading an old one.*",
|
||||
".*Index age exceeds threshold and a newer index exists.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Detach from storage controller + attach in an old generation directly on the pageserver.
|
||||
workload.stop()
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Stop"})
|
||||
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy")
|
||||
|
||||
# The controller would not do this (attach in an old generation): we are doing it to simulate
|
||||
# a hypothetical profound bug in the controller.
|
||||
env.pageserver.http_client().tenant_location_conf(
|
||||
tenant_id, {"generation": 1, "mode": "AttachedSingle", "tenant_conf": {}}
|
||||
)
|
||||
|
||||
# The pageserver should react to this situation by refusing to attach the tenant and putting
|
||||
# it into Broken state
|
||||
env.pageserver.allowed_errors.append(".*tenant is broken.*")
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="tenant is broken: Index age exceeds threshold and a newer index exists",
|
||||
):
|
||||
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: c5e0d642ef...de0a000daf
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 1feff6b60f...fd631a9590
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: b0b693ea29...03b43900ed
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: aa2e29f2b6...ae4cc30dba
16
vendor/revisions.json
vendored
16
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.1",
|
||||
"aa2e29f2b6952140dfe51876bbd11054acae776f"
|
||||
"17.0",
|
||||
"ae4cc30dba24f3910533e5a48e8103c3f2fff300"
|
||||
],
|
||||
"v16": [
|
||||
"16.5",
|
||||
"b0b693ea298454e95e6b154780d1fd586a244dfd"
|
||||
"16.4",
|
||||
"03b43900edc5d8d6eecec460bfc89aec7174bd84"
|
||||
],
|
||||
"v15": [
|
||||
"15.9",
|
||||
"1feff6b60f07cb71b665d0f5ead71a4320a71743"
|
||||
"15.8",
|
||||
"fd631a959049dfe2b82f67409c8b8b0d3e0016d1"
|
||||
],
|
||||
"v14": [
|
||||
"14.14",
|
||||
"c5e0d642efb02e4bfedc283b0a7707fe6c79cc89"
|
||||
"14.13",
|
||||
"de0a000dafc2e66ce2e39282d3aa1c704fe0390e"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user