diff --git a/libs/remote_storage/src/error.rs b/libs/remote_storage/src/error.rs index 96f044e087..66422853e1 100644 --- a/libs/remote_storage/src/error.rs +++ b/libs/remote_storage/src/error.rs @@ -44,6 +44,26 @@ impl DownloadError { } } +impl From for DownloadError { + fn from(value: std::io::Error) -> Self { + let needs_unwrap = value.kind() == std::io::ErrorKind::Other + && value + .get_ref() + .and_then(|x| x.downcast_ref::()) + .is_some(); + + if needs_unwrap { + *value + .into_inner() + .expect("just checked") + .downcast::() + .expect("just checked") + } else { + DownloadError::Other(value.into()) + } + } +} + #[derive(Debug)] pub enum TimeTravelError { /// Validation or other error happened due to user input. @@ -142,13 +162,12 @@ impl std::fmt::Display for TimeoutOrCancel { impl std::error::Error for TimeoutOrCancel {} impl TimeoutOrCancel { - pub fn caused(error: &anyhow::Error) -> Option<&Self> { - error.root_cause().downcast_ref() - } - /// Returns true if the error was caused by [`TimeoutOrCancel::Cancel`]. pub fn caused_by_cancel(error: &anyhow::Error) -> bool { - Self::caused(error).is_some_and(Self::is_cancel) + error + .root_cause() + .downcast_ref::() + .is_some_and(Self::is_cancel) } pub fn is_cancel(&self) -> bool { diff --git a/libs/remote_storage/src/support.rs b/libs/remote_storage/src/support.rs index 20f193c6c8..d146b5445b 100644 --- a/libs/remote_storage/src/support.rs +++ b/libs/remote_storage/src/support.rs @@ -73,6 +73,8 @@ where if !*this.hit { if let Poll::Ready(e) = this.cancellation.poll(cx) { *this.hit = true; + + // most likely this will be a std::io::Error wrapping a DownloadError let e = Err(std::io::Error::from(e)); return Poll::Ready(Some(e)); } @@ -130,6 +132,8 @@ mod tests { .is_some_and(|e| matches!(e, DownloadError::Cancelled)), "{inner:?}" ); + let e = DownloadError::from(e); + assert!(matches!(e, DownloadError::Cancelled), "{e:?}"); tokio::select! { _ = stream.next() => unreachable!("no timeout ever happens as we were already cancelled"), @@ -146,7 +150,7 @@ mod tests { let stream = DownloadStream::new(cancel_or_timeout(timeout, cancel.clone()), inner); let mut stream = std::pin::pin!(stream); - // because the stream uses 120s timeout we are paused, we advance to 120s right away. + // because the stream uses 120s timeout and we are paused, we advance to 120s right away. let first = stream.next(); let e = first.await.expect("there must be some").unwrap_err(); @@ -158,6 +162,8 @@ mod tests { .is_some_and(|e| matches!(e, DownloadError::Timeout)), "{inner:?}" ); + let e = DownloadError::from(e); + assert!(matches!(e, DownloadError::Timeout), "{e:?}"); cancel.cancel(); diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index c70267474e..962cf5d12e 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -88,14 +88,7 @@ pub async fn download_layer_file<'a>( let mut reader = tokio_util::io::StreamReader::new(download.download_stream); - let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file) - .await - .with_context(|| { - format!( - "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}" - ) - }) - .map_err(DownloadError::Other); + let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await; match bytes_amount { Ok(bytes_amount) => { @@ -107,7 +100,7 @@ pub async fn download_layer_file<'a>( on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}")); } - Err(e) + Err(e.into()) } } }, @@ -245,10 +238,7 @@ async fn do_download_index_part( let stream = download.download_stream; let mut stream = StreamReader::new(stream); - tokio::io::copy_buf(&mut stream, &mut bytes) - .await - .with_context(|| format!("download index part at {remote_path:?}")) - .map_err(DownloadError::Other)?; + tokio::io::copy_buf(&mut stream, &mut bytes).await?; Ok(bytes) }, @@ -428,14 +418,7 @@ pub(crate) async fn download_initdb_tar_zst( let mut download = tokio_util::io::StreamReader::new(download.download_stream); let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file); - // TODO: this consumption of the response body should be subject to timeout + cancellation, but - // not without thinking carefully about how to recover safely from cancelling a write to - // local storage (e.g. by writing into a temp file as we do in download_layer) - // FIXME: flip the weird error wrapping - tokio::io::copy_buf(&mut download, &mut writer) - .await - .with_context(|| format!("download initdb.tar.zst at {remote_path:?}")) - .map_err(DownloadError::Other)?; + tokio::io::copy_buf(&mut download, &mut writer).await?; let mut file = writer.into_inner(); diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 6966cf7709..51ab421b58 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -438,8 +438,14 @@ impl From for UpdateError { fn from(value: std::io::Error) -> Self { if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) { UpdateError::NoSpace + } else if value + .get_ref() + .and_then(|x| x.downcast_ref::()) + .is_some() + { + UpdateError::from(DownloadError::from(value)) } else { - // An I/O error from e.g. tokio::io::copy is most likely a remote storage issue + // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue UpdateError::Other(anyhow::anyhow!(value)) } } @@ -672,20 +678,17 @@ impl<'a> TenantDownloader<'a> { .await { Ok(bytes) => bytes, - Err(e) => { - if let DownloadError::NotFound = e { - // A heatmap might be out of date and refer to a layer that doesn't exist any more. - // This is harmless: continue to download the next layer. It is expected during compaction - // GC. - tracing::debug!( - "Skipped downloading missing layer {}, raced with compaction/gc?", - layer.name - ); - continue; - } else { - return Err(e.into()); - } + Err(DownloadError::NotFound) => { + // A heatmap might be out of date and refer to a layer that doesn't exist any more. + // This is harmless: continue to download the next layer. It is expected during compaction + // GC. + tracing::debug!( + "Skipped downloading missing layer {}, raced with compaction/gc?", + layer.name + ); + continue; } + Err(e) => return Err(e.into()), }; if downloaded_bytes != layer.metadata.file_size {