diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 245a896568..0e2580de85 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -234,10 +234,8 @@ mod download; pub mod index; mod upload; -// re-export this -pub use download::is_temp_download_file; -pub use download::list_remote_timelines; -use tracing::{info_span, Instrument}; +// re-export these +pub use download::{is_temp_download_file, list_remote_timelines}; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; @@ -250,6 +248,7 @@ use anyhow::ensure; use remote_storage::{DownloadError, GenericRemoteStorage}; use tokio::runtime::Runtime; use tracing::{error, info, warn}; +use tracing::{info_span, Instrument}; use utils::lsn::Lsn; @@ -538,6 +537,12 @@ impl RemoteTimelineClient { self.tenant_id, self.timeline_id, ) + .measure_remote_op( + self.tenant_id, + self.timeline_id, + RemoteOpFileKind::Index, + RemoteOpKind::Download, + ) .await } @@ -559,6 +564,12 @@ impl RemoteTimelineClient { path, layer_metadata, ) + .measure_remote_op( + self.tenant_id, + self.timeline_id, + RemoteOpFileKind::Layer, + RemoteOpKind::Download, + ) .await?; // Update the metadata for given layer file. The remote index file diff --git a/pageserver/src/storage_sync/delete.rs b/pageserver/src/storage_sync/delete.rs index 99b013f96b..fdf88eabb6 100644 --- a/pageserver/src/storage_sync/delete.rs +++ b/pageserver/src/storage_sync/delete.rs @@ -12,29 +12,26 @@ pub(super) async fn delete_layer( fail::fail_point!("before-delete-layer", |_| { anyhow::bail!("failpoint before-delete-layer") }); - async { - debug!( - "Deleting layer from remote storage: {:?}", - local_layer_path.display() - ); + debug!( + "Deleting layer from remote storage: {:?}", + local_layer_path.display() + ); - let storage_path = storage - .remote_object_id(local_layer_path) - .with_context(|| { - format!( - "Failed to get the layer storage path for local path '{}'", - local_layer_path.display() - ) - })?; - - // FIXME: If the deletion fails because the object already didn't exist, - // it would be good to just issue a warning but consider it success. - storage.delete(&storage_path).await.with_context(|| { + let storage_path = storage + .remote_object_id(local_layer_path) + .with_context(|| { format!( - "Failed to delete remote layer from storage at '{:?}'", - storage_path + "Failed to get the layer storage path for local path '{}'", + local_layer_path.display() ) - }) - } - .await + })?; + + // FIXME: If the deletion fails because the object already didn't exist, + // it would be good to just issue a warning but consider it success. + storage.delete(&storage_path).await.with_context(|| { + format!( + "Failed to delete remote layer from storage at '{:?}'", + storage_path + ) + }) } diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index fcd3f7dbef..04368b6783 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -9,7 +9,6 @@ use tokio::io::AsyncWriteExt; use tracing::debug; use crate::config::PageServerConf; -use crate::metrics::{MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind}; use crate::storage_sync::index::LayerFileMetadata; use remote_storage::{DownloadError, GenericRemoteStorage}; use utils::crashsafe::path_with_suffix_extension; @@ -34,39 +33,6 @@ pub async fn download_layer_file<'a>( timeline_id: TimelineId, path: &'a RelativePath, layer_metadata: &'a LayerFileMetadata, -) -> anyhow::Result { - download_layer_file_guts(conf, storage, tenant_id, timeline_id, path, layer_metadata) - .measure_remote_op( - tenant_id, - timeline_id, - RemoteOpFileKind::Layer, - RemoteOpKind::Download, - ) - .await -} - -const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; - -pub fn is_temp_download_file(path: &Path) -> bool { - let extension = path.extension().map(|pname| { - pname - .to_str() - .expect("paths passed to this function must be valid Rust strings") - }); - match extension { - Some(TEMP_DOWNLOAD_EXTENSION) => true, - Some(_) => false, - None => false, - } -} - -async fn download_layer_file_guts<'a>( - conf: &'static PageServerConf, - storage: &'a GenericRemoteStorage, - tenant_id: TenantId, - timeline_id: TimelineId, - path: &'a RelativePath, - layer_metadata: &'a LayerFileMetadata, ) -> anyhow::Result { let timeline_path = conf.timeline_path(&timeline_id, &tenant_id); @@ -163,6 +129,22 @@ async fn download_layer_file_guts<'a>( Ok(bytes_amount) } + +const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; + +pub fn is_temp_download_file(path: &Path) -> bool { + let extension = path.extension().map(|pname| { + pname + .to_str() + .expect("paths passed to this function must be valid Rust strings") + }); + match extension { + Some(TEMP_DOWNLOAD_EXTENSION) => true, + Some(_) => false, + None => false, + } +} + /// List timelines of given tenant in remote storage pub async fn list_remote_timelines<'a>( storage: &'a GenericRemoteStorage, @@ -234,27 +216,26 @@ pub async fn download_index_part( tenant_id: TenantId, timeline_id: TimelineId, ) -> Result { - async { - let index_part_path = conf - .metadata_path(timeline_id, tenant_id) - .with_file_name(IndexPart::FILE_NAME); - let part_storage_path = storage - .remote_object_id(&index_part_path) - .with_context(|| { - format!( - "Failed to get the index part storage path for local path '{}'", - index_part_path.display() - ) - }) - .map_err(DownloadError::BadInput)?; + let index_part_path = conf + .metadata_path(timeline_id, tenant_id) + .with_file_name(IndexPart::FILE_NAME); + let part_storage_path = storage + .remote_object_id(&index_part_path) + .with_context(|| { + format!( + "Failed to get the index part storage path for local path '{}'", + index_part_path.display() + ) + }) + .map_err(DownloadError::BadInput)?; - let mut index_part_download = storage.download(&part_storage_path).await?; + let mut index_part_download = storage.download(&part_storage_path).await?; - let mut index_part_bytes = Vec::new(); - tokio::io::copy( - &mut index_part_download.download_stream, - &mut index_part_bytes, - ) + let mut index_part_bytes = Vec::new(); + tokio::io::copy( + &mut index_part_download.download_stream, + &mut index_part_bytes, + ) .await .with_context(|| { format!( @@ -264,22 +245,14 @@ pub async fn download_index_part( }) .map_err(DownloadError::Other)?; - let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) - .with_context(|| { - format!( - "Failed to deserialize index part file into file '{}'", - index_part_path.display() - ) - }) - .map_err(DownloadError::Other)?; + let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) + .with_context(|| { + format!( + "Failed to deserialize index part file into file '{}'", + index_part_path.display() + ) + }) + .map_err(DownloadError::Other)?; - Ok(index_part) - } - .measure_remote_op( - tenant_id, - timeline_id, - RemoteOpFileKind::Index, - RemoteOpKind::Download, - ) - .await + Ok(index_part) } diff --git a/pageserver/src/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs index d3fd797b53..261a954fd9 100644 --- a/pageserver/src/storage_sync/upload.rs +++ b/pageserver/src/storage_sync/upload.rs @@ -22,27 +22,24 @@ pub(super) async fn upload_index_part<'a>( fail_point!("before-upload-index", |_| { bail!("failpoint before-upload-index") }); - async { - let index_part_bytes = serde_json::to_vec(&index_part) - .context("Failed to serialize index part file into bytes")?; - let index_part_size = index_part_bytes.len(); - let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes)); + let index_part_bytes = serde_json::to_vec(&index_part) + .context("Failed to serialize index part file into bytes")?; + let index_part_size = index_part_bytes.len(); + let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes)); - let index_part_path = conf - .metadata_path(timeline_id, tenant_id) - .with_file_name(IndexPart::FILE_NAME); - storage - .upload_storage_object( - Box::new(index_part_bytes), - index_part_size, - &index_part_path, - ) - .await - .with_context(|| { - format!("Failed to upload index part for '{tenant_id} / {timeline_id}'") - }) - } - .await + let index_part_path = conf + .metadata_path(timeline_id, tenant_id) + .with_file_name(IndexPart::FILE_NAME); + storage + .upload_storage_object( + Box::new(index_part_bytes), + index_part_size, + &index_part_path, + ) + .await + .with_context(|| { + format!("Failed to upload index part for '{tenant_id} / {timeline_id}'") + }) } /// Attempts to upload given layer files. @@ -57,54 +54,51 @@ pub(super) async fn upload_timeline_layer( fail_point!("before-upload-layer", |_| { bail!("failpoint before-upload-layer") }); - async { - let storage_path = storage.remote_object_id(source_path).with_context(|| { + let storage_path = storage.remote_object_id(source_path).with_context(|| { + format!( + "Failed to get the layer storage path for local path '{}'", + source_path.display() + ) + })?; + + let source_file = fs::File::open(&source_path).await.with_context(|| { + format!( + "Failed to open a source file for layer '{}'", + source_path.display() + ) + })?; + + let fs_size = source_file + .metadata() + .await + .with_context(|| { format!( - "Failed to get the layer storage path for local path '{}'", + "Failed to get the source file metadata for layer '{}'", source_path.display() ) - })?; + })? + .len(); - let source_file = fs::File::open(&source_path).await.with_context(|| { - format!( - "Failed to open a source file for layer '{}'", - source_path.display() - ) - })?; - - let fs_size = source_file - .metadata() - .await - .with_context(|| { - format!( - "Failed to get the source file metadata for layer '{}'", - source_path.display() - ) - })? - .len(); - - // FIXME: this looks bad - if let Some(metadata_size) = known_metadata.file_size() { - if metadata_size != fs_size { - bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}"); - } - } else { - // this is a silly state we would like to avoid + // FIXME: this looks bad + if let Some(metadata_size) = known_metadata.file_size() { + if metadata_size != fs_size { + bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}"); } - - let fs_size = usize::try_from(fs_size).with_context(|| format!("File {source_path:?} size {fs_size} could not be converted to usize"))?; - - storage - .upload(Box::new(source_file), fs_size, &storage_path, None) - .await - .with_context(|| { - format!( - "Failed to upload a layer from local path '{}'", - source_path.display() - ) - })?; - - Ok(()) + } else { + // this is a silly state we would like to avoid } - .await + + let fs_size = usize::try_from(fs_size).with_context(|| format!("File {source_path:?} size {fs_size} could not be converted to usize"))?; + + storage + .upload(Box::new(source_file), fs_size, &storage_path, None) + .await + .with_context(|| { + format!( + "Failed to upload a layer from local path '{}'", + source_path.display() + ) + })?; + + Ok(()) }