mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-09 21:50:37 +00:00
Compare commits
10 Commits
conrad/ref
...
skyzh/fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
499105da6d | ||
|
|
95474cfbe0 | ||
|
|
b501f1a681 | ||
|
|
d710f000ef | ||
|
|
feaeba3750 | ||
|
|
2c4829c2bf | ||
|
|
cdde254c84 | ||
|
|
a9db766c20 | ||
|
|
42ac6f6377 | ||
|
|
45f6111ad9 |
@@ -360,7 +360,12 @@ impl RemoteStorage for LocalFs {
|
||||
let mut objects = Vec::with_capacity(keys.len());
|
||||
for key in keys {
|
||||
let path = key.with_base(&self.storage_root);
|
||||
let metadata = file_metadata(&path).await?;
|
||||
let metadata = file_metadata(&path).await;
|
||||
if let Err(DownloadError::NotFound) = metadata {
|
||||
// Race: if the file is deleted between listing and metadata check, ignore it.
|
||||
continue;
|
||||
}
|
||||
let metadata = metadata?;
|
||||
if metadata.is_dir() {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ mod list_writer;
|
||||
mod validator;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -157,6 +158,7 @@ pub struct DeletionQueueClient {
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
max_layer_generation_in_queue: Arc<AtomicU32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
@@ -382,6 +384,17 @@ pub enum DeletionQueueError {
|
||||
}
|
||||
|
||||
impl DeletionQueueClient {
|
||||
/// Returns if there is any layer file <= `gen` in the deletion queue.
|
||||
pub(crate) fn maybe_processing_generation(&self, gen: Generation) -> bool {
|
||||
if let Some(gen) = gen.into() {
|
||||
self.max_layer_generation_in_queue
|
||||
.load(std::sync::atomic::Ordering::SeqCst)
|
||||
>= gen
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// This is cancel-safe. If you drop the future before it completes, the message
|
||||
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
|
||||
/// we decide to do a deletion the decision is always final.
|
||||
@@ -505,6 +518,13 @@ impl DeletionQueueClient {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(layers.len() as u64);
|
||||
for (_, meta) in &layers {
|
||||
let Some(gen) = meta.generation.into() else {
|
||||
continue;
|
||||
};
|
||||
self.max_layer_generation_in_queue
|
||||
.fetch_max(gen, std::sync::atomic::Ordering::SeqCst);
|
||||
}
|
||||
self.do_push(
|
||||
&self.tx,
|
||||
ListWriterQueueMessage::Delete(DeletionOp {
|
||||
@@ -651,6 +671,7 @@ impl DeletionQueue {
|
||||
tx,
|
||||
executor_tx: executor_tx.clone(),
|
||||
lsn_table: lsn_table.clone(),
|
||||
max_layer_generation_in_queue: Arc::new(AtomicU32::new(0)),
|
||||
},
|
||||
cancel: cancel.clone(),
|
||||
},
|
||||
@@ -1265,6 +1286,7 @@ pub(crate) mod mock {
|
||||
tx: self.tx.clone(),
|
||||
executor_tx: self.executor_tx.clone(),
|
||||
lsn_table: self.lsn_table.clone(),
|
||||
max_layer_generation_in_queue: Arc::new(AtomicU32::new(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use utils::backoff;
|
||||
use utils::pausable_failpoint;
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
@@ -90,6 +91,7 @@ impl Deleter {
|
||||
/// Block until everything in accumulator has been executed
|
||||
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
|
||||
while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
|
||||
pausable_failpoint!("deletion-queue-before-execute-pause");
|
||||
match self.remote_delete().await {
|
||||
Ok(()) => {
|
||||
// Note: we assume that the remote storage layer returns Ok(()) if some
|
||||
|
||||
@@ -304,6 +304,20 @@ where
|
||||
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
|
||||
tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
|
||||
|
||||
// Well, this check might be redundant, but it's a good sanity check. This fast
|
||||
// path was added because `test_emergency_mode` wrt the initial barrier. The initial
|
||||
// barrier will force and wait the deletion queue to be completely flushed, but if
|
||||
// the control plane fails, we cannot really flush anything, and the whole test will
|
||||
// get stuck. This check will prevent the test from getting stuck, and in reality, if
|
||||
// there is nothing to delete, the initial barrier will not stuck.
|
||||
if self.pending_lists.is_empty()
|
||||
&& self.validated_lists.is_empty()
|
||||
&& self.pending_key_count == 0
|
||||
{
|
||||
// Fast path: nothing to do
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Issue any required generation validation calls to the control plane
|
||||
self.validate().await?;
|
||||
|
||||
|
||||
@@ -244,7 +244,7 @@ use self::index::IndexPart;
|
||||
use super::config::AttachedLocationConfig;
|
||||
use super::metadata::MetadataUpdate;
|
||||
use super::storage_layer::{Layer, LayerName, ResidentLayer};
|
||||
use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
|
||||
use super::upload_queue::{BarrierType, NotInitialized, SetDeletedFlagProgress};
|
||||
use super::{DeleteTimelineError, Generation};
|
||||
|
||||
pub(crate) use download::{
|
||||
@@ -896,7 +896,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
Some(self.schedule_wait_barrier0(upload_queue))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -935,7 +935,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
Some(self.schedule_wait_barrier0(upload_queue))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -973,7 +973,9 @@ impl RemoteTimelineClient {
|
||||
|
||||
match (current, uploaded) {
|
||||
(x, y) if wanted(x) && wanted(y) => None,
|
||||
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
|
||||
(x, y) if wanted(x) && !wanted(y) => {
|
||||
Some(self.schedule_wait_barrier0(upload_queue))
|
||||
}
|
||||
// Usual case: !wanted(x) && !wanted(y)
|
||||
//
|
||||
// Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
|
||||
@@ -990,7 +992,7 @@ impl RemoteTimelineClient {
|
||||
.map(|x| x.with_reason(reason))
|
||||
.or_else(|| Some(index::GcBlocking::started_now_for(reason)));
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
Some(self.schedule_wait_barrier0(upload_queue))
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -1033,7 +1035,9 @@ impl RemoteTimelineClient {
|
||||
|
||||
match (current, uploaded) {
|
||||
(x, y) if wanted(x) && wanted(y) => None,
|
||||
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
|
||||
(x, y) if wanted(x) && !wanted(y) => {
|
||||
Some(self.schedule_wait_barrier0(upload_queue))
|
||||
}
|
||||
(x, y) => {
|
||||
if !wanted(x) && wanted(y) {
|
||||
warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
|
||||
@@ -1044,7 +1048,7 @@ impl RemoteTimelineClient {
|
||||
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
|
||||
// FIXME: bogus ?
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
Some(self.schedule_wait_barrier0(upload_queue))
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -1300,7 +1304,7 @@ impl RemoteTimelineClient {
|
||||
let upload_queue = guard
|
||||
.initialized_mut()
|
||||
.map_err(WaitCompletionError::NotInitialized)?;
|
||||
self.schedule_barrier0(upload_queue)
|
||||
self.schedule_wait_barrier0(upload_queue)
|
||||
};
|
||||
|
||||
Self::wait_completion0(receiver).await
|
||||
@@ -1316,19 +1320,32 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
pub fn schedule_initial_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
self.schedule_barrier(BarrierType::Initial)
|
||||
}
|
||||
|
||||
fn schedule_barrier(self: &Arc<Self>, barrier: BarrierType) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
self.schedule_barrier0(upload_queue);
|
||||
self.schedule_barrier0(upload_queue, barrier);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Schedule a barrier to wait for all previously scheduled operations to complete.
|
||||
fn schedule_wait_barrier0(
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
) -> tokio::sync::watch::Receiver<()> {
|
||||
self.schedule_barrier0(upload_queue, BarrierType::Normal)
|
||||
}
|
||||
|
||||
fn schedule_barrier0(
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
barrier: BarrierType,
|
||||
) -> tokio::sync::watch::Receiver<()> {
|
||||
let (sender, receiver) = tokio::sync::watch::channel(());
|
||||
let barrier_op = UploadOp::Barrier(sender);
|
||||
let barrier_op = UploadOp::Barrier(sender, barrier);
|
||||
|
||||
upload_queue.queued_operations.push_back(barrier_op);
|
||||
// Don't count this kind of operation!
|
||||
@@ -1796,9 +1813,12 @@ impl RemoteTimelineClient {
|
||||
while let Some(next_op) = upload_queue.queued_operations.front() {
|
||||
// Can we run this task now?
|
||||
let can_run_now = match next_op {
|
||||
UploadOp::UploadLayer(..) => {
|
||||
// Can always be scheduled.
|
||||
true
|
||||
UploadOp::UploadLayer(_, meta) => {
|
||||
// Can always be scheduled except when there's a barrier, or if the deletion queue doesn't contain any file with the same/lower generation.
|
||||
upload_queue.num_inprogress_barriers == 0
|
||||
|| !self
|
||||
.deletion_queue_client
|
||||
.maybe_processing_generation(meta.generation)
|
||||
}
|
||||
UploadOp::UploadMetadata { .. } => {
|
||||
// These can only be performed after all the preceding operations
|
||||
@@ -1810,7 +1830,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
|
||||
}
|
||||
|
||||
UploadOp::Barrier(_) | UploadOp::Shutdown => {
|
||||
UploadOp::Barrier(_, _) | UploadOp::Shutdown => {
|
||||
upload_queue.inprogress_tasks.is_empty()
|
||||
}
|
||||
};
|
||||
@@ -1848,10 +1868,15 @@ impl RemoteTimelineClient {
|
||||
UploadOp::Delete(_) => {
|
||||
upload_queue.num_inprogress_deletions += 1;
|
||||
}
|
||||
UploadOp::Barrier(sender) => {
|
||||
UploadOp::Barrier(sender, BarrierType::Normal) => {
|
||||
// For other barriers, simply send back the ack.
|
||||
sender.send_replace(());
|
||||
continue;
|
||||
}
|
||||
UploadOp::Barrier(_, BarrierType::Initial) => {
|
||||
// For initial barrier, we need to wait for deletions.
|
||||
upload_queue.num_inprogress_barriers += 1;
|
||||
}
|
||||
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
|
||||
};
|
||||
|
||||
@@ -1993,7 +2018,6 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
pausable_failpoint!("before-delete-layer-pausable");
|
||||
self.deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_shard_id,
|
||||
@@ -2005,7 +2029,12 @@ impl RemoteTimelineClient {
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
|
||||
UploadOp::Barrier(_, _) => self
|
||||
.deletion_queue_client
|
||||
.flush_execute()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e)),
|
||||
unexpected @ UploadOp::Shutdown => {
|
||||
// unreachable. Barrier operations are handled synchronously in
|
||||
// launch_queued_tasks
|
||||
warn!("unexpected {unexpected:?} operation in perform_upload_task");
|
||||
@@ -2015,6 +2044,10 @@ impl RemoteTimelineClient {
|
||||
|
||||
match upload_result {
|
||||
Ok(()) => {
|
||||
if let UploadOp::Barrier(sender, _) = &task.op {
|
||||
// Notify the caller that the barrier has been reached.
|
||||
sender.send(()).ok();
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
|
||||
@@ -2126,7 +2159,11 @@ impl RemoteTimelineClient {
|
||||
upload_queue.num_inprogress_deletions -= 1;
|
||||
None
|
||||
}
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
|
||||
UploadOp::Barrier(..) => {
|
||||
upload_queue.num_inprogress_barriers -= 1;
|
||||
None
|
||||
}
|
||||
UploadOp::Shutdown => unreachable!(),
|
||||
};
|
||||
|
||||
// Launch any queued tasks that were unblocked by this one.
|
||||
@@ -2252,6 +2289,7 @@ impl RemoteTimelineClient {
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
num_inprogress_barriers: 0,
|
||||
inprogress_tasks: HashMap::default(),
|
||||
queued_operations: VecDeque::default(),
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -2281,7 +2319,8 @@ impl RemoteTimelineClient {
|
||||
assert_eq!(
|
||||
qi.num_inprogress_layer_uploads
|
||||
+ qi.num_inprogress_metadata_uploads
|
||||
+ qi.num_inprogress_deletions,
|
||||
+ qi.num_inprogress_deletions
|
||||
+ qi.num_inprogress_barriers,
|
||||
qi.inprogress_tasks.len()
|
||||
);
|
||||
|
||||
|
||||
@@ -2646,8 +2646,10 @@ impl Timeline {
|
||||
// See https://github.com/neondatabase/neon/issues/5878
|
||||
//
|
||||
// NB: generation numbers naturally protect against this because they disambiguate
|
||||
// (1) and (4)
|
||||
self.remote_client.schedule_barrier()?;
|
||||
// (1) and (4) ONLY IF generation number gets bumped. There are some cases where
|
||||
// we load a tenant without bumping the generation number (i.e., detach ancestor
|
||||
// and timeline offload/un-offload). In those cases, we need to rely on the barrier.
|
||||
self.remote_client.schedule_initial_barrier()?;
|
||||
// Tenant::create_timeline will wait for these uploads to happen before returning, or
|
||||
// on retry.
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ pub(crate) struct UploadQueueInitialized {
|
||||
pub(crate) num_inprogress_layer_uploads: usize,
|
||||
pub(crate) num_inprogress_metadata_uploads: usize,
|
||||
pub(crate) num_inprogress_deletions: usize,
|
||||
pub(crate) num_inprogress_barriers: usize,
|
||||
|
||||
/// Tasks that are currently in-progress. In-progress means that a tokio Task
|
||||
/// has been launched for it. An in-progress task can be busy uploading, but it can
|
||||
@@ -179,6 +180,7 @@ impl UploadQueue {
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
num_inprogress_barriers: 0,
|
||||
inprogress_tasks: HashMap::new(),
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -220,6 +222,7 @@ impl UploadQueue {
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
num_inprogress_barriers: 0,
|
||||
inprogress_tasks: HashMap::new(),
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -280,6 +283,14 @@ pub(crate) struct Delete {
|
||||
pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum BarrierType {
|
||||
/// Barrier is a normal barrier, not an initial barrier.
|
||||
Normal,
|
||||
/// Barrier is an initial barrier, scheduled at timeline load.
|
||||
Initial,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum UploadOp {
|
||||
/// Upload a layer file
|
||||
@@ -295,7 +306,10 @@ pub(crate) enum UploadOp {
|
||||
Delete(Delete),
|
||||
|
||||
/// Barrier. When the barrier operation is reached, the channel is closed.
|
||||
Barrier(tokio::sync::watch::Sender<()>),
|
||||
/// The boolean value indicates whether the barrier is an initial barrier scheduled
|
||||
/// at timeline load -- if yes, we will need to wait for all deletions to be completed
|
||||
/// before the next upload.
|
||||
Barrier(tokio::sync::watch::Sender<()>, BarrierType),
|
||||
|
||||
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
|
||||
/// this is the same as a Barrier.
|
||||
@@ -322,7 +336,8 @@ impl std::fmt::Display for UploadOp {
|
||||
UploadOp::Delete(delete) => {
|
||||
write!(f, "Delete({} layers)", delete.layers.len())
|
||||
}
|
||||
UploadOp::Barrier(_) => write!(f, "Barrier"),
|
||||
UploadOp::Barrier(_, BarrierType::Normal) => write!(f, "Barrier"),
|
||||
UploadOp::Barrier(_, BarrierType::Initial) => write!(f, "Barrier (initial)"),
|
||||
UploadOp::Shutdown => write!(f, "Shutdown"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4947,6 +4947,7 @@ def last_flush_lsn_upload(
|
||||
timeline_id: TimelineId,
|
||||
pageserver_id: int | None = None,
|
||||
auth_token: str | None = None,
|
||||
wait_until_uploaded: bool = True,
|
||||
) -> Lsn:
|
||||
"""
|
||||
Wait for pageserver to catch to the latest flush LSN of given endpoint,
|
||||
@@ -4960,7 +4961,9 @@ def last_flush_lsn_upload(
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
ps_http = pageserver.http_client(auth_token=auth_token)
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True)
|
||||
ps_http.timeline_checkpoint(
|
||||
tenant_shard_id, timeline_id, wait_until_uploaded=wait_until_uploaded
|
||||
)
|
||||
return last_flush_lsn
|
||||
|
||||
|
||||
@@ -4985,6 +4988,7 @@ def generate_uploads_and_deletions(
|
||||
timeline_id: TimelineId | None = None,
|
||||
data: str | None = None,
|
||||
pageserver: NeonPageserver,
|
||||
wait_until_uploaded: bool = True,
|
||||
):
|
||||
"""
|
||||
Using the environment's default tenant + timeline, generate a load pattern
|
||||
@@ -5007,7 +5011,12 @@ def generate_uploads_and_deletions(
|
||||
if init:
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
env,
|
||||
endpoint,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
pageserver_id=pageserver.id,
|
||||
wait_until_uploaded=wait_until_uploaded,
|
||||
)
|
||||
|
||||
def churn(data):
|
||||
@@ -5030,7 +5039,12 @@ def generate_uploads_and_deletions(
|
||||
# in a state where there are "future layers" in remote storage that will generate deletions
|
||||
# after a restart.
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
env,
|
||||
endpoint,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
pageserver_id=pageserver.id,
|
||||
wait_until_uploaded=wait_until_uploaded,
|
||||
)
|
||||
|
||||
# Compaction should generate some GC-elegible layers
|
||||
@@ -5046,4 +5060,4 @@ def generate_uploads_and_deletions(
|
||||
# background ingest, no more uploads pending, and therefore no non-determinism
|
||||
# in subsequent actions like pageserver restarts.
|
||||
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_until_uploaded)
|
||||
|
||||
@@ -343,7 +343,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json["tenant_shards"], list)
|
||||
return res_json
|
||||
|
||||
def tenant_get_location(self, tenant_id: TenantShardId):
|
||||
def tenant_get_location(self, tenant_id: TenantId | TenantShardId):
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/location_config/{tenant_id}",
|
||||
)
|
||||
@@ -794,7 +794,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
if compact is not None:
|
||||
query["compact"] = "true" if compact else "false"
|
||||
|
||||
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
|
||||
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}")
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
|
||||
params=query,
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
|
||||
@@ -19,7 +20,11 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize(
|
||||
"attach_mode",
|
||||
["default_generation", "same_generation"],
|
||||
)
|
||||
def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str):
|
||||
"""
|
||||
Regression test for issue https://github.com/neondatabase/neon/issues/5878 .
|
||||
|
||||
@@ -168,11 +173,32 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_conf = ps_http.tenant_config(tenant_id)
|
||||
generation_before_detach = get_generation_number()
|
||||
env.pageserver.tenant_detach(tenant_id)
|
||||
failpoint_name = "before-delete-layer-pausable"
|
||||
failpoint_deletion_queue = "deletion-queue-before-execute-pause"
|
||||
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides)
|
||||
generation_after_reattach = get_generation_number()
|
||||
ps_http.configure_failpoints((failpoint_deletion_queue, "pause"))
|
||||
|
||||
if attach_mode == "default_generation":
|
||||
env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides)
|
||||
elif attach_mode == "same_generation":
|
||||
# Attach with the same generation number -- this is possible with timeline offload and detach ancestor
|
||||
env.pageserver.tenant_attach(
|
||||
tenant_id,
|
||||
tenant_conf.tenant_specific_overrides,
|
||||
generation=generation_before_detach,
|
||||
# We want to avoid the generation bump and don't want to talk with the storcon
|
||||
override_storage_controller_generation=False,
|
||||
)
|
||||
else:
|
||||
raise AssertionError(f"Unknown attach_mode: {attach_mode}")
|
||||
|
||||
# Get it from pageserver API instead of storcon API b/c we might not have attached using the storcon
|
||||
# API if attach_mode == "same_generation"
|
||||
tenant_location = env.pageserver.http_client().tenant_get_location(tenant_id)
|
||||
generation_after_reattach = tenant_location["generation"]
|
||||
|
||||
if attach_mode == "same_generation":
|
||||
# The generation number should be the same as before the detach
|
||||
assert generation_before_detach == generation_after_reattach
|
||||
wait_until_tenant_active(ps_http, tenant_id)
|
||||
|
||||
# Ensure the IndexPart upload that unlinks the layer file finishes, i.e., doesn't clog the queue.
|
||||
@@ -182,15 +208,8 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
wait_until(10, 0.5, future_layer_is_gone_from_index_part)
|
||||
|
||||
# NB: the layer file is unlinked index part now, but, because we made the delete
|
||||
# operation stuck, the layer file itself is still in the remote_storage
|
||||
wait_until(
|
||||
10,
|
||||
0.5,
|
||||
lambda: env.pageserver.assert_log_contains(
|
||||
f".*{tenant_id}.*at failpoint.*{failpoint_name}"
|
||||
),
|
||||
)
|
||||
# We already make deletion stuck here, but we don't necessarily hit the failpoint
|
||||
# because deletions are batched.
|
||||
future_layer_path = env.pageserver_remote_storage.remote_layer_path(
|
||||
tenant_id, timeline_id, future_layer.to_str(), generation=generation_before_detach
|
||||
)
|
||||
@@ -224,11 +243,13 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
# Window has passed, unstuck the delete, let upload queue drain.
|
||||
# Window has passed, unstuck the delete, let deletion queue drain; the upload queue should
|
||||
# have drained because we put these layer deletion operations into the deletion queue and
|
||||
# have consumed the operation from the upload queue.
|
||||
log.info("unstuck the DELETE")
|
||||
ps_http.configure_failpoints(("before-delete-layer-pausable", "off"))
|
||||
|
||||
ps_http.configure_failpoints((failpoint_deletion_queue, "off"))
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
env.pageserver.http_client().deletion_queue_flush(True)
|
||||
|
||||
# Examine the resulting S3 state.
|
||||
log.info("integrity-check the remote storage")
|
||||
@@ -247,3 +268,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
final_stat = future_layer_path.stat()
|
||||
log.info(f"future layer path: {future_layer_path}")
|
||||
assert final_stat.st_mtime != pre_stat.st_mtime
|
||||
|
||||
# Ensure no weird errors in the end...
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
|
||||
@@ -459,7 +459,11 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env.pageserver.start()
|
||||
|
||||
# The pageserver should provide service to clients
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
|
||||
# Because it is in emergency mode, it will not attempt to validate deletions required by the initial barrier, and therefore
|
||||
# other files cannot be uploaded b/c it's waiting for the initial barrier to be validated.
|
||||
generate_uploads_and_deletions(
|
||||
env, init=False, pageserver=env.pageserver, wait_until_uploaded=False
|
||||
)
|
||||
|
||||
# The pageserver should neither validate nor execute any deletions, it should have
|
||||
# loaded the DeletionLists from before though
|
||||
|
||||
Reference in New Issue
Block a user