wip: try: request coalescing (now building)

This commit is contained in:
Joonas Koivunen
2023-05-05 00:34:26 +03:00
parent 5a924f1867
commit e00da98996
3 changed files with 16 additions and 31 deletions

View File

@@ -139,12 +139,9 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
fn from(value: crate::tenant::DeleteTimelineError) -> Self {
use crate::tenant::DeleteTimelineError::*;
match value {
NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found")),
HasChildren => ApiError::BadRequest(anyhow::anyhow!(
"Cannot delete timeline which has child timelines"
)),
StopUploadQueue(e) => ApiError::InternalServerError(e.into()),
Other(e) => ApiError::InternalServerError(e),
NotFound => ApiError::NotFound(value.into()),
HasChildren => ApiError::BadRequest(value.into()),
_ => ApiError::InternalServerError(value.into()),
}
}
}

View File

@@ -444,9 +444,9 @@ remote:
#[derive(Debug, thiserror::Error)]
pub enum DeleteTimelineError {
#[error("NotFound")]
#[error("timeline not found")]
NotFound,
#[error("HasChildren")]
#[error("Cannot delete timeline which has child timelines")]
HasChildren,
#[cfg(feature = "testing")]
#[cfg_attr(feature = "testing", error("failpoint {0}"))]
@@ -1454,23 +1454,8 @@ impl Tenant {
let mut rx = rx.clone();
let spawn_new = match &*rx.borrow_and_update() {
Some(Ok(())) => return Ok(()),
Some(Err(InnerDeleteTimelineError::StopUploadQueue(_))) => {
// this is permanent
return Err(DeleteTimelineError::StopUploadQueue);
}
Some(Err(InnerDeleteTimelineError::ChildAppearedAfterRemoveDir)) => {
// permanent, lucky to have raced that just now
return Err(DeleteTimelineError::ChildAppearedAfterRemoveDir);
}
#[cfg(feature = "testing")]
Some(Err(InnerDeleteTimelineError::Failpoint(_))) => {
// let's try again
true
}
Some(Err(InnerDeleteTimelineError::UploadFailed)) => {
// again!
true
}
Some(Err(e)) if e.is_permanent() => return Err(DeleteTimelineError::from(e)),
Some(Err(_retryable)) => true,
None => false,
};
@@ -1489,18 +1474,21 @@ impl Tenant {
} else {
// try another round
let (tx, rx) = tokio::sync::watch::channel(None);
// now anyone else racing will see the None
*g = Some(rx.clone());
let this = self.clone();
let timeline = timeline.clone();
tokio::spawn(async move {
let res = this.unique_delete_timeline(&timeline).await;
let _ = tx.send_replace(Some(res));
});
tokio::spawn(
async move {
let res = this.unique_delete_timeline(&timeline).await;
let _ = tx.send_replace(Some(res));
}
.instrument(tracing::info_span!("unique_delete_timeline")),
);
rx
}
};
// should read it right away to see if it's ready
loop {
rx.changed()
.await
@@ -1584,6 +1572,7 @@ impl Tenant {
.conf
.timeline_path(&timeline.timeline_id, &self.tenant_id);
#[cfg(feature = "testing")]
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"timeline-delete-before-rm",

View File

@@ -204,7 +204,6 @@ mod download;
pub mod index;
mod upload;
use anyhow::Context;
use chrono::Utc;
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};