fix: remaining missed cancellations and timeouts (#6843)

As noticed in #6836 some occurances of error conversions were missed in
#6697:
- `std::io::Error` popped up by `tokio::io::copy_buf` containing
`DownloadError` was turned into `DownloadError::Other`
- similarly for secondary downloader errors

These changes come at the loss of pathname context.

Cc: #6096
This commit is contained in:
Joonas Koivunen
2024-02-21 17:20:59 +02:00
committed by GitHub
parent 7257ffbf75
commit 41464325c7
4 changed files with 52 additions and 41 deletions

View File

@@ -44,6 +44,26 @@ impl DownloadError {
}
}
impl From<std::io::Error> for DownloadError {
fn from(value: std::io::Error) -> Self {
let needs_unwrap = value.kind() == std::io::ErrorKind::Other
&& value
.get_ref()
.and_then(|x| x.downcast_ref::<DownloadError>())
.is_some();
if needs_unwrap {
*value
.into_inner()
.expect("just checked")
.downcast::<DownloadError>()
.expect("just checked")
} else {
DownloadError::Other(value.into())
}
}
}
#[derive(Debug)]
pub enum TimeTravelError {
/// Validation or other error happened due to user input.
@@ -142,13 +162,12 @@ impl std::fmt::Display for TimeoutOrCancel {
impl std::error::Error for TimeoutOrCancel {}
impl TimeoutOrCancel {
pub fn caused(error: &anyhow::Error) -> Option<&Self> {
error.root_cause().downcast_ref()
}
/// Returns true if the error was caused by [`TimeoutOrCancel::Cancel`].
pub fn caused_by_cancel(error: &anyhow::Error) -> bool {
Self::caused(error).is_some_and(Self::is_cancel)
error
.root_cause()
.downcast_ref::<Self>()
.is_some_and(Self::is_cancel)
}
pub fn is_cancel(&self) -> bool {

View File

@@ -73,6 +73,8 @@ where
if !*this.hit {
if let Poll::Ready(e) = this.cancellation.poll(cx) {
*this.hit = true;
// most likely this will be a std::io::Error wrapping a DownloadError
let e = Err(std::io::Error::from(e));
return Poll::Ready(Some(e));
}
@@ -130,6 +132,8 @@ mod tests {
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
"{inner:?}"
);
let e = DownloadError::from(e);
assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
tokio::select! {
_ = stream.next() => unreachable!("no timeout ever happens as we were already cancelled"),
@@ -146,7 +150,7 @@ mod tests {
let stream = DownloadStream::new(cancel_or_timeout(timeout, cancel.clone()), inner);
let mut stream = std::pin::pin!(stream);
// because the stream uses 120s timeout we are paused, we advance to 120s right away.
// because the stream uses 120s timeout and we are paused, we advance to 120s right away.
let first = stream.next();
let e = first.await.expect("there must be some").unwrap_err();
@@ -158,6 +162,8 @@ mod tests {
.is_some_and(|e| matches!(e, DownloadError::Timeout)),
"{inner:?}"
);
let e = DownloadError::from(e);
assert!(matches!(e, DownloadError::Timeout), "{e:?}");
cancel.cancel();

View File

@@ -88,14 +88,7 @@ pub async fn download_layer_file<'a>(
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file)
.await
.with_context(|| {
format!(
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
)
})
.map_err(DownloadError::Other);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await;
match bytes_amount {
Ok(bytes_amount) => {
@@ -107,7 +100,7 @@ pub async fn download_layer_file<'a>(
on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
}
Err(e)
Err(e.into())
}
}
},
@@ -245,10 +238,7 @@ async fn do_download_index_part(
let stream = download.download_stream;
let mut stream = StreamReader::new(stream);
tokio::io::copy_buf(&mut stream, &mut bytes)
.await
.with_context(|| format!("download index part at {remote_path:?}"))
.map_err(DownloadError::Other)?;
tokio::io::copy_buf(&mut stream, &mut bytes).await?;
Ok(bytes)
},
@@ -428,14 +418,7 @@ pub(crate) async fn download_initdb_tar_zst(
let mut download = tokio_util::io::StreamReader::new(download.download_stream);
let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
// TODO: this consumption of the response body should be subject to timeout + cancellation, but
// not without thinking carefully about how to recover safely from cancelling a write to
// local storage (e.g. by writing into a temp file as we do in download_layer)
// FIXME: flip the weird error wrapping
tokio::io::copy_buf(&mut download, &mut writer)
.await
.with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
.map_err(DownloadError::Other)?;
tokio::io::copy_buf(&mut download, &mut writer).await?;
let mut file = writer.into_inner();

View File

@@ -438,8 +438,14 @@ impl From<std::io::Error> for UpdateError {
fn from(value: std::io::Error) -> Self {
if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
UpdateError::NoSpace
} else if value
.get_ref()
.and_then(|x| x.downcast_ref::<DownloadError>())
.is_some()
{
UpdateError::from(DownloadError::from(value))
} else {
// An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
// An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
UpdateError::Other(anyhow::anyhow!(value))
}
}
@@ -672,20 +678,17 @@ impl<'a> TenantDownloader<'a> {
.await
{
Ok(bytes) => bytes,
Err(e) => {
if let DownloadError::NotFound = e {
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
// This is harmless: continue to download the next layer. It is expected during compaction
// GC.
tracing::debug!(
"Skipped downloading missing layer {}, raced with compaction/gc?",
layer.name
);
continue;
} else {
return Err(e.into());
}
Err(DownloadError::NotFound) => {
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
// This is harmless: continue to download the next layer. It is expected during compaction
// GC.
tracing::debug!(
"Skipped downloading missing layer {}, raced with compaction/gc?",
layer.name
);
continue;
}
Err(e) => return Err(e.into()),
};
if downloaded_bytes != layer.metadata.file_size {