mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
timeline_delete => persist_index_part_with_deleted_flag: make cancel safe
this fixes the test added in the previous commit
This commit is contained in:
@@ -208,6 +208,7 @@ use anyhow::Context;
|
||||
use chrono::Utc;
|
||||
// re-export these
|
||||
pub use download::{is_temp_download_file, list_remote_timelines};
|
||||
use scopeguard::ScopeGuard;
|
||||
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -629,7 +630,9 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
// NOTE: if there were no tasks to call stop we need to call stop by ourselves first
|
||||
pub(crate) async fn persist_index_part_with_deleted_flag(&self) -> anyhow::Result<()> {
|
||||
pub(crate) async fn persist_index_part_with_deleted_flag(
|
||||
self: &Arc<Self>,
|
||||
) -> anyhow::Result<()> {
|
||||
let index_part = {
|
||||
let mut locked = self.upload_queue.lock().unwrap();
|
||||
|
||||
@@ -643,13 +646,24 @@ impl RemoteTimelineClient {
|
||||
};
|
||||
|
||||
if let Some(delete_dat) = stopped.last_uploaded_index_part.deleted_at.as_ref() {
|
||||
// XXX deal with case where the first call is stopped being polled
|
||||
anyhow::bail!("timeline is deleting, deleted_at: {:?}", delete_dat);
|
||||
}
|
||||
stopped.last_uploaded_index_part.deleted_at = Some(Utc::now().naive_utc());
|
||||
stopped.last_uploaded_index_part.clone()
|
||||
};
|
||||
|
||||
let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
|
||||
let mut locked = self_clone.upload_queue.lock().unwrap();
|
||||
let stopped = match &mut *locked {
|
||||
UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!(
|
||||
"there's no way out of Stopping, and we checked it's Stopping above: {:?}",
|
||||
locked.as_str(),
|
||||
),
|
||||
UploadQueue::Stopped(stopped) => stopped,
|
||||
};
|
||||
stopped.last_uploaded_index_part.deleted_at = None;
|
||||
});
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
tokio::task::spawn_blocking({
|
||||
let current = Span::current();
|
||||
@@ -673,7 +687,12 @@ impl RemoteTimelineClient {
|
||||
self.timeline_id,
|
||||
&index_part,
|
||||
)
|
||||
.await
|
||||
.await?;
|
||||
|
||||
// all good, keep the deleted_at flag
|
||||
ScopeGuard::into_inner(undo_deleted_at);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -25,7 +25,7 @@ pub(crate) enum UploadQueue {
|
||||
}
|
||||
|
||||
impl UploadQueue {
|
||||
fn as_str(&self) -> &'static str {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
UploadQueue::Uninitialized => "Uninitialized",
|
||||
UploadQueue::Initialized(_) => "Initialized",
|
||||
|
||||
Reference in New Issue
Block a user