mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 23:20:40 +00:00
Compare commits
38 Commits
hack/compu
...
problame/f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab89f164d8 | ||
|
|
e1e627e897 | ||
|
|
8425b6ab21 | ||
|
|
c025617639 | ||
|
|
b8eba89a10 | ||
|
|
dd73c1d793 | ||
|
|
08558b83ed | ||
|
|
30ea7b141b | ||
|
|
4903953e9f | ||
|
|
2785b6a4ed | ||
|
|
de3f23344a | ||
|
|
b58bf56670 | ||
|
|
16486edd8e | ||
|
|
db9d78151a | ||
|
|
3720e9073f | ||
|
|
11c18b05aa | ||
|
|
48112bdf53 | ||
|
|
e41b2ed66a | ||
|
|
d5280bf2dd | ||
|
|
0fcc8036fc | ||
|
|
7f3ee0d45d | ||
|
|
958fd5720e | ||
|
|
bcf49d3aa1 | ||
|
|
95b560db4e | ||
|
|
b69d12d50b | ||
|
|
dead7798ef | ||
|
|
dd36f542e4 | ||
|
|
33b52486d2 | ||
|
|
aba2d60815 | ||
|
|
efe914f056 | ||
|
|
3b818d2bb1 | ||
|
|
dbdec75171 | ||
|
|
d826cacc7d | ||
|
|
3ffdb196bf | ||
|
|
23b3c66abf | ||
|
|
e85547956e | ||
|
|
f0ab08c1e0 | ||
|
|
9fda377d75 |
@@ -128,6 +128,15 @@ impl RemoteStorage for LocalFs {
|
||||
// We need this dance with sort of durable rename (without fsyncs)
|
||||
// to prevent partial uploads. This was really hit when pageserver shutdown
|
||||
// cancelled the upload and partial file was left on the fs
|
||||
// NOTE: Because temp file suffix always the same this operation is racy.
|
||||
// Two concurrent operations can lead to the following sequence:
|
||||
// T1: write(temp)
|
||||
// T2: write(temp) -> overwrites the content
|
||||
// T1: rename(temp, dst) -> succeeds
|
||||
// T2: rename(temp, dst) -> fails, temp no longet exists
|
||||
// This can be solved by supplying unique temp suffix every time, but this situation
|
||||
// is not normall in the first place, the error can help (and helped at least once)
|
||||
// to discover bugs in upper level synchronization.
|
||||
let temp_file_path =
|
||||
path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
|
||||
let mut destination = io::BufWriter::new(
|
||||
|
||||
@@ -143,6 +143,7 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
HasChildren => ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot delete timeline which has child timelines"
|
||||
)),
|
||||
StopUploadQueue(e) => ApiError::InternalServerError(e.into()),
|
||||
Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::metadata::load_metadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
|
||||
use crate::tenant::storage_layer::DeltaLayer;
|
||||
use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
@@ -447,6 +448,8 @@ pub enum DeleteTimelineError {
|
||||
NotFound,
|
||||
#[error("HasChildren")]
|
||||
HasChildren,
|
||||
#[error("stop upload queue: {0:#}")]
|
||||
StopUploadQueue(#[from] remote_timeline_client::StopError),
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
@@ -695,16 +698,9 @@ impl Tenant {
|
||||
.await
|
||||
.context("download index file")?;
|
||||
|
||||
let remote_metadata = index_part.parse_metadata().context("parse metadata")?;
|
||||
|
||||
debug!("finished index part download");
|
||||
|
||||
Result::<_, anyhow::Error>::Ok((
|
||||
timeline_id,
|
||||
client,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
))
|
||||
Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
|
||||
}
|
||||
.map(move |res| {
|
||||
res.with_context(|| format!("download index part for timeline {timeline_id}"))
|
||||
@@ -713,17 +709,26 @@ impl Tenant {
|
||||
);
|
||||
}
|
||||
// Wait for all the download tasks to complete & collect results.
|
||||
let mut remote_clients = HashMap::new();
|
||||
let mut index_parts = HashMap::new();
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
let mut timeline_ancestors = HashMap::new();
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
// NB: we already added timeline_id as context to the error
|
||||
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
|
||||
let (timeline_id, client, index_part, remote_metadata) = result?;
|
||||
let (timeline_id, client, index_part) = result?;
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
timeline_ancestors.insert(timeline_id, remote_metadata);
|
||||
index_parts.insert(timeline_id, index_part);
|
||||
remote_clients.insert(timeline_id, client);
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
timeline_ancestors.insert(
|
||||
timeline_id,
|
||||
index_part.parse_metadata().context("parse_metadata")?,
|
||||
);
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted => {
|
||||
info!("timeline {} is deleted, skipping", timeline_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
@@ -731,12 +736,16 @@ impl Tenant {
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors)?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_parts.remove(&timeline_id).unwrap(),
|
||||
index_part,
|
||||
remote_metadata,
|
||||
remote_clients.remove(&timeline_id).unwrap(),
|
||||
remote_client,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1042,21 +1051,12 @@ impl Tenant {
|
||||
/// Subroutine of `load_tenant`, to load an individual timeline
|
||||
///
|
||||
/// NB: The parent is assumed to be already loaded!
|
||||
#[instrument(skip(self, local_metadata, ctx), fields(timeline_id=%timeline_id))]
|
||||
async fn load_local_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: TimelineMetadata,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
|
||||
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
|
||||
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
|
||||
Some(ancestor_timeline)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
|
||||
RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
@@ -1069,6 +1069,29 @@ impl Tenant {
|
||||
let remote_startup_data = match &remote_client {
|
||||
Some(remote_client) => match remote_client.download_index_file().await {
|
||||
Ok(index_part) => {
|
||||
let index_part = match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted => {
|
||||
// TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
|
||||
// Example:
|
||||
// start deletion operation
|
||||
// finishes upload of index part
|
||||
// pageserver crashes
|
||||
// remote storage gets de-configured
|
||||
// pageserver starts
|
||||
//
|
||||
// We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
|
||||
// Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
|
||||
info!("is_deleted is set on remote, resuming removal of local data originally done by timeline deletion handler");
|
||||
std::fs::remove_dir_all(
|
||||
self.conf.timeline_path(&timeline_id, &self.tenant_id),
|
||||
)
|
||||
.context("remove_dir_all")?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let remote_metadata = index_part.parse_metadata().context("parse_metadata")?;
|
||||
Some(RemoteStartupData {
|
||||
index_part,
|
||||
@@ -1084,6 +1107,14 @@ impl Tenant {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
|
||||
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
|
||||
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
|
||||
Some(ancestor_timeline)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
remote_client,
|
||||
@@ -1371,9 +1402,37 @@ impl Tenant {
|
||||
timeline.walreceiver.stop().await;
|
||||
debug!("wal receiver shutdown confirmed");
|
||||
|
||||
// Prevent new uploads from starting.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.stop();
|
||||
match &res {
|
||||
Ok(()) => {}
|
||||
Err(e) => match e {
|
||||
remote_timeline_client::StopError::QueueBroken => {
|
||||
// This happens if there's a panic inside above stop() call,
|
||||
// and we call stop() again after that.
|
||||
// The calling again can happen because we won't poison any
|
||||
// mutexes on the unwind path at the first panicking call.
|
||||
}
|
||||
remote_timeline_client::StopError::QueueUninitialized => {
|
||||
// This could happen if the timeline is Broken, e.g., because it failed to fetch IndexPart when it was loaded.
|
||||
}
|
||||
},
|
||||
}
|
||||
res?;
|
||||
}
|
||||
|
||||
// Stop & wait for the remaining timeline tasks, including upload tasks.
|
||||
info!("waiting for timeline tasks to shutdown");
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await;
|
||||
|
||||
// Mark timeline as deleted in S3 so we wont pick it up next time
|
||||
// during attach or pageserver restart.
|
||||
// See comment in persist_index_part_with_deleted_flag.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
remote_client.persist_index_part_with_deleted_flag().await?;
|
||||
}
|
||||
|
||||
{
|
||||
// Grab the layer_removal_cs lock, and actually perform the deletion.
|
||||
//
|
||||
@@ -1397,8 +1456,17 @@ impl Tenant {
|
||||
// by the caller.
|
||||
|
||||
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
|
||||
// XXX make this atomic so that, if we crash-mid-way, the timeline won't be picked up
|
||||
// with some layers missing.
|
||||
|
||||
fail::fail_point!("timeline-delete-before-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
|
||||
});
|
||||
|
||||
// NB: This need not be atomic because the deleted flag in the IndexPart
|
||||
// will be observed during tenant/timeline load. The deletion will be resumed there.
|
||||
//
|
||||
// For configurations without remote storage, we tolerate that we're not crash-safe here.
|
||||
// The timeline may come up Active but with missing layer files, in such setups.
|
||||
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
|
||||
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
|
||||
format!(
|
||||
"Failed to remove local timeline directory '{}'",
|
||||
|
||||
@@ -12,6 +12,7 @@ use std::io::Write;
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::info_span;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -182,7 +183,7 @@ impl TimelineMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
|
||||
pub fn to_bytes(&self) -> Result<Vec<u8>, SerializeError> {
|
||||
let body_bytes = self.body.ser()?;
|
||||
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
|
||||
let hdr = TimelineMetadataHeader {
|
||||
|
||||
@@ -204,8 +204,11 @@ mod download;
|
||||
pub mod index;
|
||||
mod upload;
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::Utc;
|
||||
// re-export these
|
||||
pub use download::{is_temp_download_file, list_remote_timelines};
|
||||
use scopeguard::ScopeGuard;
|
||||
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -213,7 +216,7 @@ use std::sync::{Arc, Mutex};
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
||||
use std::ops::DerefMut;
|
||||
use tokio::runtime::Runtime;
|
||||
use tracing::{debug, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -253,6 +256,22 @@ const FAILED_DOWNLOAD_RETRIES: u32 = 10;
|
||||
// retries. Uploads and deletions are retried forever, though.
|
||||
const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
|
||||
|
||||
pub enum MaybeDeletedIndexPart {
|
||||
IndexPart(IndexPart),
|
||||
Deleted,
|
||||
}
|
||||
|
||||
/// Errors that can arise when calling [`RemoteTimelineClient::stop`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum StopError {
|
||||
/// Returned if the upload queue was never initialized.
|
||||
/// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
|
||||
#[error("queue is not initialized")]
|
||||
QueueUninitialized,
|
||||
#[error("queue is broken")]
|
||||
QueueBroken,
|
||||
}
|
||||
|
||||
/// A client for accessing a timeline's data in remote storage.
|
||||
///
|
||||
/// This takes care of managing the number of connections, and balancing them
|
||||
@@ -336,6 +355,7 @@ impl RemoteTimelineClient {
|
||||
pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
|
||||
match &*self.upload_queue.lock().unwrap() {
|
||||
UploadQueue::Uninitialized => None,
|
||||
UploadQueue::Broken => None, // could we return something?
|
||||
UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn),
|
||||
UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn),
|
||||
}
|
||||
@@ -367,7 +387,7 @@ impl RemoteTimelineClient {
|
||||
//
|
||||
|
||||
/// Download index file
|
||||
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
|
||||
pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Index,
|
||||
&RemoteOpKind::Download,
|
||||
@@ -376,7 +396,7 @@ impl RemoteTimelineClient {
|
||||
},
|
||||
);
|
||||
|
||||
download::download_index_part(
|
||||
let index_part = download::download_index_part(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
self.tenant_id,
|
||||
@@ -389,7 +409,13 @@ impl RemoteTimelineClient {
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
)
|
||||
.await
|
||||
.await?;
|
||||
|
||||
if index_part.deleted_at.is_some() {
|
||||
Ok(MaybeDeletedIndexPart::Deleted)
|
||||
} else {
|
||||
Ok(MaybeDeletedIndexPart::IndexPart(index_part))
|
||||
}
|
||||
}
|
||||
|
||||
/// Download a (layer) file from `path`, into local filesystem.
|
||||
@@ -624,6 +650,95 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the deleted_at field in the remote index file.
|
||||
///
|
||||
/// This fails if the upload queue has not been `stop()`ed.
|
||||
///
|
||||
/// The caller is responsible for calling `stop()` AND for waiting
|
||||
/// for any ongoing upload tasks to finish after `stop()` has succeeded.
|
||||
/// Check method [`RemoteTimelineClient::stop`] for details.
|
||||
pub(crate) async fn persist_index_part_with_deleted_flag(
|
||||
self: &Arc<Self>,
|
||||
) -> anyhow::Result<()> {
|
||||
let index_part_with_deleted_at = {
|
||||
let mut locked = self.upload_queue.lock().unwrap();
|
||||
|
||||
// We must be in stopped state because otherwise
|
||||
// we can have inprogress index part upload that can overwrite the file
|
||||
// with missing is_deleted flag that we going to set below
|
||||
let stopped = match &mut *locked {
|
||||
UploadQueue::Uninitialized | UploadQueue::Initialized(_) | UploadQueue::Broken => {
|
||||
anyhow::bail!(
|
||||
"upload queue must be in state Stopped, but is in state {}",
|
||||
locked.as_str()
|
||||
);
|
||||
}
|
||||
UploadQueue::Stopped(stopped) => stopped,
|
||||
};
|
||||
|
||||
if let Some(deleted_at) = stopped.deleted_at.as_ref() {
|
||||
anyhow::bail!("timeline is deleting, deleted_at: {:?}", deleted_at);
|
||||
}
|
||||
let deleted_at = Utc::now().naive_utc();
|
||||
stopped.deleted_at = Some(deleted_at);
|
||||
|
||||
let mut index_part = IndexPart::new(
|
||||
stopped.latest_files.clone(),
|
||||
stopped.last_uploaded_consistent_lsn,
|
||||
stopped
|
||||
.latest_metadata
|
||||
.to_bytes()
|
||||
.context("serialize metadata")?,
|
||||
);
|
||||
index_part.deleted_at = Some(deleted_at);
|
||||
index_part
|
||||
};
|
||||
|
||||
let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
|
||||
let mut locked = self_clone.upload_queue.lock().unwrap();
|
||||
let stopped = match &mut *locked {
|
||||
UploadQueue::Broken | UploadQueue::Uninitialized | UploadQueue::Initialized(_) => {
|
||||
unreachable!(
|
||||
"there's no way out of Stopping, and we checked it's Stopping above: {:?}",
|
||||
locked.as_str(),
|
||||
)
|
||||
}
|
||||
UploadQueue::Stopped(stopped) => stopped,
|
||||
};
|
||||
stopped.deleted_at = None;
|
||||
});
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
tokio::task::spawn_blocking({
|
||||
let current = tracing::Span::current();
|
||||
move || {
|
||||
let _entered = current.entered();
|
||||
tracing::info!(
|
||||
"at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause"
|
||||
);
|
||||
fail::fail_point!(
|
||||
"persist_index_part_with_deleted_flag_after_set_before_upload_pause"
|
||||
);
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("spawn_blocking");
|
||||
|
||||
upload::upload_index_part(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&index_part_with_deleted_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// all good, keep the deleted_at flag
|
||||
ScopeGuard::into_inner(undo_deleted_at);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Pick next tasks from the queue, and start as many of them as possible without violating
|
||||
/// the ordering constraints.
|
||||
@@ -740,9 +855,19 @@ impl RemoteTimelineClient {
|
||||
// is cancellation safe, so we don't dare to do that. Hopefully, the
|
||||
// upload finishes or times out soon enough.
|
||||
if task_mgr::is_shutdown_requested() {
|
||||
info!("upload task cancelled by shutdown request");
|
||||
info!("upload task cancelled by shutdown request, stopping queue");
|
||||
match self.stop() {
|
||||
Ok(()) => {}
|
||||
Err(StopError::QueueBroken) => {
|
||||
warn!("stop() observed upload queue as broken");
|
||||
// In this case, it's still ok to proceed with balancing out the metric and returning.
|
||||
// (The metric has nothing to do with the queue state itself).
|
||||
}
|
||||
Err(StopError::QueueUninitialized) => {
|
||||
unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
|
||||
}
|
||||
}
|
||||
self.calls_unfinished_metric_end(&task.op);
|
||||
self.stop();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -867,6 +992,10 @@ impl RemoteTimelineClient {
|
||||
info!("another concurrent task already stopped the queue");
|
||||
return;
|
||||
}, // nothing to do
|
||||
UploadQueue::Broken => {
|
||||
warn!("the upload queue became broken while the task was running");
|
||||
return;
|
||||
}
|
||||
UploadQueue::Initialized(qi) => { qi }
|
||||
};
|
||||
|
||||
@@ -946,64 +1075,85 @@ impl RemoteTimelineClient {
|
||||
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
/// Close the upload queue for new operations and cancel queued operations.
|
||||
/// In-progress operations will still be running after this function returns.
|
||||
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
|
||||
/// to wait for them to complete, after calling this function.
|
||||
pub fn stop(&self) -> Result<(), StopError> {
|
||||
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
|
||||
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
|
||||
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
match &*guard {
|
||||
UploadQueue::Uninitialized => panic!(
|
||||
"callers are responsible for ensuring this is only called on initialized queue"
|
||||
),
|
||||
// If any of the code below panics, the queue remains in Broken state.
|
||||
// If we're coming from Initialized state, `queued_operations` will get dropped
|
||||
// as part of the panic, because it sits in the local variable named `owned`.
|
||||
// Any `wait_completion` operations against those queued operations
|
||||
// will observe an error. That's exactly what we want.
|
||||
// We don't need to care about in-progress operations because that responsibility
|
||||
// lies with the caller. There's no point for them to try anything funky, like,
|
||||
// catching the panic and retrying the stop() call. We will return QueueBroken in that case.
|
||||
let owned = std::mem::replace(&mut *guard, UploadQueue::Broken);
|
||||
let res;
|
||||
*guard = match owned {
|
||||
UploadQueue::Broken => {
|
||||
res = Err(StopError::QueueBroken);
|
||||
owned
|
||||
}
|
||||
UploadQueue::Uninitialized => {
|
||||
res = Err(StopError::QueueUninitialized);
|
||||
owned
|
||||
}
|
||||
UploadQueue::Stopped(_) => {
|
||||
// nothing to do
|
||||
info!("another concurrent task already shut down the queue");
|
||||
res = Ok(());
|
||||
owned
|
||||
}
|
||||
UploadQueue::Initialized(qi) => {
|
||||
info!("shutting down upload queue");
|
||||
|
||||
// Replace the queue with the Stopped state, taking ownership of the old
|
||||
// Initialized queue. We will do some checks on it, and then drop it.
|
||||
let qi = {
|
||||
let last_uploaded_consistent_lsn = qi.last_uploaded_consistent_lsn;
|
||||
let upload_queue = std::mem::replace(
|
||||
&mut *guard,
|
||||
UploadQueue::Stopped(UploadQueueStopped {
|
||||
last_uploaded_consistent_lsn,
|
||||
}),
|
||||
);
|
||||
if let UploadQueue::Initialized(qi) = upload_queue {
|
||||
qi
|
||||
} else {
|
||||
unreachable!("we checked in the match above that it is Initialized");
|
||||
}
|
||||
};
|
||||
let UploadQueueInitialized {
|
||||
task_counter: _,
|
||||
latest_files,
|
||||
// XXX need to think about what it means if it's non-zero here
|
||||
latest_files_changes_since_metadata_upload_scheduled: _,
|
||||
latest_metadata,
|
||||
last_uploaded_consistent_lsn,
|
||||
num_inprogress_layer_uploads,
|
||||
num_inprogress_metadata_uploads,
|
||||
num_inprogress_deletions,
|
||||
inprogress_tasks,
|
||||
queued_operations,
|
||||
} = qi;
|
||||
|
||||
// consistency check
|
||||
assert_eq!(
|
||||
qi.num_inprogress_layer_uploads
|
||||
+ qi.num_inprogress_metadata_uploads
|
||||
+ qi.num_inprogress_deletions,
|
||||
qi.inprogress_tasks.len()
|
||||
num_inprogress_layer_uploads
|
||||
+ num_inprogress_metadata_uploads
|
||||
+ num_inprogress_deletions,
|
||||
inprogress_tasks.len()
|
||||
);
|
||||
|
||||
// We don't need to do anything here for in-progress tasks. They will finish
|
||||
// on their own, decrement the unfinished-task counter themselves, and observe
|
||||
// that the queue is Stopped.
|
||||
drop(qi.inprogress_tasks);
|
||||
drop(inprogress_tasks);
|
||||
|
||||
// Tear down queued ops
|
||||
for op in qi.queued_operations.into_iter() {
|
||||
for op in queued_operations.into_iter() {
|
||||
self.calls_unfinished_metric_end(&op);
|
||||
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
|
||||
// which is exactly what we want to happen.
|
||||
drop(op);
|
||||
}
|
||||
|
||||
// We're done.
|
||||
drop(guard);
|
||||
res = Ok(());
|
||||
UploadQueue::Stopped(UploadQueueStopped {
|
||||
latest_files,
|
||||
last_uploaded_consistent_lsn,
|
||||
latest_metadata,
|
||||
deleted_at: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1240,7 +1390,11 @@ mod tests {
|
||||
}
|
||||
|
||||
// Download back the index.json, and check that the list of files is correct
|
||||
let index_part = runtime.block_on(client.download_index_file())?;
|
||||
let index_part = match runtime.block_on(client.download_index_file())? {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted => panic!("unexpectedly got deleted index part"),
|
||||
};
|
||||
|
||||
assert_file_list(
|
||||
&index_part.timeline_layers,
|
||||
&[
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use chrono::NaiveDateTime;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
|
||||
@@ -55,6 +56,10 @@ pub struct IndexPart {
|
||||
#[serde(default)]
|
||||
version: usize,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub deleted_at: Option<NaiveDateTime>,
|
||||
|
||||
/// Layer names, which are stored on the remote storage.
|
||||
///
|
||||
/// Additional metadata can might exist in `layer_metadata`.
|
||||
@@ -78,7 +83,7 @@ impl IndexPart {
|
||||
/// used to understand later versions.
|
||||
///
|
||||
/// Version is currently informative only.
|
||||
const LATEST_VERSION: usize = 1;
|
||||
const LATEST_VERSION: usize = 2;
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
pub fn new(
|
||||
@@ -101,6 +106,7 @@ impl IndexPart {
|
||||
layer_metadata,
|
||||
disk_consistent_lsn,
|
||||
metadata_bytes,
|
||||
deleted_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,6 +162,7 @@ mod tests {
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
@@ -192,6 +199,7 @@ mod tests {
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
@@ -236,6 +244,7 @@ mod tests {
|
||||
0, 0,
|
||||
]
|
||||
.to_vec(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
let empty_layers_parsed = serde_json::from_str::<IndexPart>(empty_layers_json).unwrap();
|
||||
|
||||
@@ -19,9 +19,12 @@ pub(super) async fn upload_index_part<'a>(
|
||||
timeline_id: TimelineId,
|
||||
index_part: &'a IndexPart,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::trace!("uploading new index part");
|
||||
|
||||
fail_point!("before-upload-index", |_| {
|
||||
bail!("failpoint before-upload-index")
|
||||
});
|
||||
|
||||
let index_part_bytes = serde_json::to_vec(&index_part)
|
||||
.context("Failed to serialize index part file into bytes")?;
|
||||
let index_part_size = index_part_bytes.len();
|
||||
@@ -31,6 +34,7 @@ pub(super) async fn upload_index_part<'a>(
|
||||
.metadata_path(timeline_id, tenant_id)
|
||||
.with_file_name(IndexPart::FILE_NAME);
|
||||
let storage_path = conf.remote_path(&index_part_path)?;
|
||||
|
||||
storage
|
||||
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path)
|
||||
.await
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
|
||||
use chrono::NaiveDateTime;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
|
||||
@@ -18,18 +19,20 @@ use utils::lsn::Lsn;
|
||||
// that many upload queues in a running pageserver, and most of them are initialized
|
||||
// anyway.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub(crate) enum UploadQueue {
|
||||
pub(super) enum UploadQueue {
|
||||
Uninitialized,
|
||||
Initialized(UploadQueueInitialized),
|
||||
Stopped(UploadQueueStopped),
|
||||
Broken,
|
||||
}
|
||||
|
||||
impl UploadQueue {
|
||||
fn as_str(&self) -> &'static str {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
UploadQueue::Uninitialized => "Uninitialized",
|
||||
UploadQueue::Initialized(_) => "Initialized",
|
||||
UploadQueue::Stopped(_) => "Stopped",
|
||||
UploadQueue::Broken => "Broken",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -75,8 +78,12 @@ pub(crate) struct UploadQueueInitialized {
|
||||
pub(crate) queued_operations: VecDeque<UploadOp>,
|
||||
}
|
||||
|
||||
pub(crate) struct UploadQueueStopped {
|
||||
pub(crate) last_uploaded_consistent_lsn: Lsn,
|
||||
pub(super) struct UploadQueueStopped {
|
||||
pub(super) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
pub(super) last_uploaded_consistent_lsn: Lsn,
|
||||
pub(super) latest_metadata: TimelineMetadata,
|
||||
/// If Some(), a call to `persist_index_part_with_deleted_flag` is ongoing or finished.
|
||||
pub(super) deleted_at: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
impl UploadQueue {
|
||||
@@ -86,7 +93,7 @@ impl UploadQueue {
|
||||
) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||
match self {
|
||||
UploadQueue::Uninitialized => (),
|
||||
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
|
||||
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) | UploadQueue::Broken => {
|
||||
anyhow::bail!("already initialized, state {}", self.as_str())
|
||||
}
|
||||
}
|
||||
@@ -120,7 +127,7 @@ impl UploadQueue {
|
||||
) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||
match self {
|
||||
UploadQueue::Uninitialized => (),
|
||||
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
|
||||
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) | UploadQueue::Broken => {
|
||||
anyhow::bail!("already initialized, state {}", self.as_str())
|
||||
}
|
||||
}
|
||||
@@ -170,7 +177,7 @@ impl UploadQueue {
|
||||
|
||||
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||
match self {
|
||||
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
|
||||
UploadQueue::Broken | UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
|
||||
anyhow::bail!("queue is in state {}", self.as_str())
|
||||
}
|
||||
UploadQueue::Initialized(x) => Ok(x),
|
||||
|
||||
@@ -311,9 +311,9 @@ class PageserverHttpClient(requests.Session):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs):
|
||||
res = self.delete(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
|
||||
@@ -87,7 +87,9 @@ def wait_until_tenant_state(
|
||||
|
||||
time.sleep(period)
|
||||
|
||||
raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")
|
||||
raise Exception(
|
||||
f"Tenant {tenant_id} did not become {expected_state} within {iterations * period} seconds"
|
||||
)
|
||||
|
||||
|
||||
def wait_until_tenant_active(
|
||||
|
||||
@@ -143,6 +143,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
assert env.pageserver.log_contains(
|
||||
".*WARN.*ignored .* unexpected bytes after the tar archive.*"
|
||||
)
|
||||
|
||||
# NOTE: delete can easily come before upload operations are completed
|
||||
client.timeline_delete(tenant, timeline)
|
||||
|
||||
# Importing correct backup works
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
|
||||
|
||||
import os
|
||||
import queue
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
@@ -9,6 +10,7 @@ from pathlib import Path
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
LocalFsStorage,
|
||||
@@ -752,4 +754,292 @@ def get_queued_count(
|
||||
return int(val)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@pytest.mark.parametrize("fill_branch", [True, False])
|
||||
def test_timeline_resurrection_on_attach(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
fill_branch: bool,
|
||||
):
|
||||
"""
|
||||
After deleting a timeline it should never appear again.
|
||||
This test ensures that this invariant holds for detach+attach.
|
||||
Original issue: https://github.com/neondatabase/neon/issues/3560
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_timeline_resurrection_on_attach",
|
||||
)
|
||||
|
||||
##### First start, insert data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
pg = env.endpoints.create_start("main")
|
||||
|
||||
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||
main_timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE f (i integer);")
|
||||
cur.execute("INSERT INTO f VALUES (generate_series(1,1000));")
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, main_timeline_id, current_lsn)
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
ps_http.timeline_checkpoint(tenant_id, main_timeline_id)
|
||||
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
log.info("waiting for checkpoint upload")
|
||||
wait_for_upload(ps_http, tenant_id, main_timeline_id, current_lsn)
|
||||
log.info("upload of checkpoint is done")
|
||||
|
||||
branch_timeline_id = env.neon_cli.create_branch("new", "main")
|
||||
new_pg = env.endpoints.create_start("new")
|
||||
|
||||
if fill_branch:
|
||||
with new_pg.cursor() as cur:
|
||||
cur.execute("INSERT INTO f VALUES (generate_series(1,1000));")
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, branch_timeline_id, current_lsn)
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
ps_http.timeline_checkpoint(tenant_id, branch_timeline_id)
|
||||
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
log.info("waiting for checkpoint upload")
|
||||
wait_for_upload(ps_http, tenant_id, branch_timeline_id, current_lsn)
|
||||
log.info("upload of checkpoint is done")
|
||||
else:
|
||||
pass
|
||||
|
||||
# delete new timeline
|
||||
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id)
|
||||
|
||||
##### Stop the pageserver instance, erase all its data
|
||||
env.endpoints.stop_all()
|
||||
env.pageserver.stop()
|
||||
|
||||
dir_to_clear = Path(env.repo_dir) / "tenants"
|
||||
shutil.rmtree(dir_to_clear)
|
||||
os.mkdir(dir_to_clear)
|
||||
|
||||
##### Second start, restore the data and ensure that we see only timeline that wasnt deleted
|
||||
env.pageserver.start()
|
||||
|
||||
ps_http.tenant_attach(tenant_id=tenant_id)
|
||||
|
||||
wait_until_tenant_active(ps_http, tenant_id=tenant_id, iterations=10, period=0.5)
|
||||
|
||||
timelines = ps_http.timeline_list(tenant_id=tenant_id)
|
||||
assert {TimelineId(tl["timeline_id"]) for tl in timelines} == {
|
||||
main_timeline_id
|
||||
}, "the deleted timeline should not have been resurrected"
|
||||
assert all([tl["state"] == "Active" for tl in timelines])
|
||||
|
||||
|
||||
def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
When deleting a timeline, if we succeed in setting the deleted flag remotely
|
||||
but fail to delete the local state, restarting the pageserver should resume
|
||||
the deletion of the local state.
|
||||
(Deletion of the state in S3 is not implemented yet.)
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_timeline_delete_fail_before_local_delete",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.allowed_errors.append(".*failpoint: timeline-delete-before-rm")
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*during shutdown: cannot flush frozen layers when flush_loop is not running, state is Exited"
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
ps_http.configure_failpoints(("timeline-delete-before-rm", "return"))
|
||||
|
||||
# construct pair of branches
|
||||
intermediate_timeline_id = env.neon_cli.create_branch(
|
||||
"test_timeline_delete_fail_before_local_delete"
|
||||
)
|
||||
|
||||
leaf_timeline_id = env.neon_cli.create_branch(
|
||||
"test_timeline_delete_fail_before_local_delete1",
|
||||
"test_timeline_delete_fail_before_local_delete",
|
||||
)
|
||||
|
||||
leaf_timeline_path = (
|
||||
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="failpoint: timeline-delete-before-rm",
|
||||
):
|
||||
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
|
||||
|
||||
assert leaf_timeline_path.exists(), "the failpoint didn't work"
|
||||
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
# Wait for tenant to finish loading.
|
||||
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=0.5)
|
||||
|
||||
assert (
|
||||
not leaf_timeline_path.exists()
|
||||
), "timeline load procedure should have resumed the deletion interrupted by the failpoint"
|
||||
timelines = ps_http.timeline_list(env.initial_tenant)
|
||||
assert {TimelineId(tl["timeline_id"]) for tl in timelines} == {
|
||||
intermediate_timeline_id,
|
||||
env.initial_timeline,
|
||||
}, "other timelines should not have been affected"
|
||||
assert all([tl["state"] == "Active" for tl in timelines])
|
||||
|
||||
|
||||
def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
If we're stuck uploading the index file with the is_delete flag,
|
||||
eventually console will hand up and retry.
|
||||
If we're still stuck at the retry time, ensure that the retry
|
||||
fails with status 500, signalling to console that it should retry
|
||||
later.
|
||||
Ideally, timeline_delete should return 202 Accepted and require
|
||||
console to poll for completion, but, that would require changing
|
||||
the API contract.
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
child_timeline_id = env.neon_cli.create_branch("child", "main")
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# make the first call sleep practically forever
|
||||
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
|
||||
def first_call(result_queue):
|
||||
try:
|
||||
log.info("first call start")
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
|
||||
log.info("first call success")
|
||||
result_queue.put("success")
|
||||
except Exception:
|
||||
log.exception("first call failed")
|
||||
result_queue.put("failure, see log for stack trace")
|
||||
|
||||
first_call_result: queue.Queue[str] = queue.Queue()
|
||||
first_call_thread = threading.Thread(target=first_call, args=(first_call_result,))
|
||||
first_call_thread.start()
|
||||
|
||||
try:
|
||||
|
||||
def first_call_hit_failpoint():
|
||||
assert env.pageserver.log_contains(
|
||||
f".*{child_timeline_id}.*at failpoint {failpoint_name}"
|
||||
)
|
||||
|
||||
wait_until(50, 0.1, first_call_hit_failpoint)
|
||||
|
||||
# make the second call and assert behavior
|
||||
log.info("second call start")
|
||||
with pytest.raises(
|
||||
PageserverApiException, match="timeline is deleting, deleted_at"
|
||||
) as second_call_err:
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
|
||||
assert second_call_err.value.status_code == 500
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*timeline is deleting, deleted_at: .*"
|
||||
)
|
||||
# the second call will try to transition the timeline into Stopping state as well
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
log.info("second call failed as expected")
|
||||
|
||||
# by now we know that the second call failed, let's ensure the first call will finish
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
result = first_call_result.get()
|
||||
assert result == "success"
|
||||
|
||||
finally:
|
||||
log.info("joining first call thread")
|
||||
# in any case, make sure the lifetime of the thread is bounded to this test
|
||||
first_call_thread.join()
|
||||
|
||||
|
||||
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the client hangs up before we start the index part upload but after we mark it
|
||||
deleted in local memory, a subsequent delete_timeline call should be able to do
|
||||
another delete timeline operation.
|
||||
|
||||
This tests cancel safety up to the given failpoint.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_delete_timeline_client_hangup",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
child_timeline_id = env.neon_cli.create_branch("child", "main")
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
|
||||
with pytest.raises(requests.exceptions.Timeout):
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
|
||||
# make sure the timeout was due to the failpoint
|
||||
at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*"
|
||||
|
||||
def hit_failpoint():
|
||||
assert env.pageserver.log_contains(at_failpoint_log_message)
|
||||
|
||||
wait_until(50, 0.1, hit_failpoint)
|
||||
|
||||
# we log this error if a client hangs up
|
||||
# might as well use it as another indicator that the test works
|
||||
hangup_log_message = f".*DELETE.*{child_timeline_id}.*request was dropped before completing"
|
||||
env.pageserver.allowed_errors.append(hangup_log_message)
|
||||
|
||||
def got_hangup_log_message():
|
||||
assert env.pageserver.log_contains(hangup_log_message)
|
||||
|
||||
wait_until(50, 0.1, got_hangup_log_message)
|
||||
|
||||
# ok, retry without failpoint, it should succeed
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
# this should succeed
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
# the second call will try to transition the timeline into Stopping state, but it's already in that state
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
|
||||
|
||||
# TODO Test that we correctly handle GC of files that are stuck in upload queue.
|
||||
|
||||
@@ -39,23 +39,17 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
"test_ancestor_branch_delete_branch1", "test_ancestor_branch_delete_parent"
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
timeline_path = (
|
||||
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(parent_timeline_id)
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException, match="Cannot delete timeline which has child timelines"
|
||||
) as exc:
|
||||
timeline_path = (
|
||||
env.repo_dir
|
||||
/ "tenants"
|
||||
/ str(env.initial_tenant)
|
||||
/ "timelines"
|
||||
/ str(parent_timeline_id)
|
||||
)
|
||||
assert timeline_path.exists()
|
||||
|
||||
ps_http.timeline_delete(env.initial_tenant, parent_timeline_id)
|
||||
|
||||
assert not timeline_path.exists()
|
||||
|
||||
assert exc.value.status_code == 400
|
||||
|
||||
timeline_path = (
|
||||
@@ -87,3 +81,14 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
assert exc.value.status_code == 404
|
||||
|
||||
# Check that we didnt pick up the timeline again after restart.
|
||||
# See https://github.com/neondatabase/neon/issues/3560
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
|
||||
) as exc:
|
||||
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user