Little cleanup around measure_remote_op calls

- Previously, the functions in download.rs did the measurement themselves,
  whereas for functions in delete.rs and upload.rs, it was the caller's
  responsibility. Move the measure_remote_op calls from download.rs to
  the callers, for consistency.
- Remove pointless async blocks in upload.rs and delete.rs. They would've
  been useful for inserting the measure_remote_op calls, but since the
  caller's are responsible for measure_remote_op now, they're not neeed.
- tiny cosmetic cleanup around imports
This commit is contained in:
Heikki Linnakangas
2022-11-25 17:03:33 +02:00
committed by Heikki Linnakangas
parent 6b4a28bf7f
commit 3eb85957df
4 changed files with 135 additions and 160 deletions

View File

@@ -234,10 +234,8 @@ mod download;
pub mod index;
mod upload;
// re-export this
pub use download::is_temp_download_file;
pub use download::list_remote_timelines;
use tracing::{info_span, Instrument};
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
@@ -250,6 +248,7 @@ use anyhow::ensure;
use remote_storage::{DownloadError, GenericRemoteStorage};
use tokio::runtime::Runtime;
use tracing::{error, info, warn};
use tracing::{info_span, Instrument};
use utils::lsn::Lsn;
@@ -538,6 +537,12 @@ impl RemoteTimelineClient {
self.tenant_id,
self.timeline_id,
)
.measure_remote_op(
self.tenant_id,
self.timeline_id,
RemoteOpFileKind::Index,
RemoteOpKind::Download,
)
.await
}
@@ -559,6 +564,12 @@ impl RemoteTimelineClient {
path,
layer_metadata,
)
.measure_remote_op(
self.tenant_id,
self.timeline_id,
RemoteOpFileKind::Layer,
RemoteOpKind::Download,
)
.await?;
// Update the metadata for given layer file. The remote index file

View File

@@ -12,29 +12,26 @@ pub(super) async fn delete_layer(
fail::fail_point!("before-delete-layer", |_| {
anyhow::bail!("failpoint before-delete-layer")
});
async {
debug!(
"Deleting layer from remote storage: {:?}",
local_layer_path.display()
);
debug!(
"Deleting layer from remote storage: {:?}",
local_layer_path.display()
);
let storage_path = storage
.remote_object_id(local_layer_path)
.with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
local_layer_path.display()
)
})?;
// FIXME: If the deletion fails because the object already didn't exist,
// it would be good to just issue a warning but consider it success.
storage.delete(&storage_path).await.with_context(|| {
let storage_path = storage
.remote_object_id(local_layer_path)
.with_context(|| {
format!(
"Failed to delete remote layer from storage at '{:?}'",
storage_path
"Failed to get the layer storage path for local path '{}'",
local_layer_path.display()
)
})
}
.await
})?;
// FIXME: If the deletion fails because the object already didn't exist,
// it would be good to just issue a warning but consider it success.
storage.delete(&storage_path).await.with_context(|| {
format!(
"Failed to delete remote layer from storage at '{:?}'",
storage_path
)
})
}

View File

@@ -9,7 +9,6 @@ use tokio::io::AsyncWriteExt;
use tracing::debug;
use crate::config::PageServerConf;
use crate::metrics::{MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind};
use crate::storage_sync::index::LayerFileMetadata;
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;
@@ -34,39 +33,6 @@ pub async fn download_layer_file<'a>(
timeline_id: TimelineId,
path: &'a RelativePath,
layer_metadata: &'a LayerFileMetadata,
) -> anyhow::Result<u64> {
download_layer_file_guts(conf, storage, tenant_id, timeline_id, path, layer_metadata)
.measure_remote_op(
tenant_id,
timeline_id,
RemoteOpFileKind::Layer,
RemoteOpKind::Download,
)
.await
}
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
pub fn is_temp_download_file(path: &Path) -> bool {
let extension = path.extension().map(|pname| {
pname
.to_str()
.expect("paths passed to this function must be valid Rust strings")
});
match extension {
Some(TEMP_DOWNLOAD_EXTENSION) => true,
Some(_) => false,
None => false,
}
}
async fn download_layer_file_guts<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
tenant_id: TenantId,
timeline_id: TimelineId,
path: &'a RelativePath,
layer_metadata: &'a LayerFileMetadata,
) -> anyhow::Result<u64> {
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
@@ -163,6 +129,22 @@ async fn download_layer_file_guts<'a>(
Ok(bytes_amount)
}
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
pub fn is_temp_download_file(path: &Path) -> bool {
let extension = path.extension().map(|pname| {
pname
.to_str()
.expect("paths passed to this function must be valid Rust strings")
});
match extension {
Some(TEMP_DOWNLOAD_EXTENSION) => true,
Some(_) => false,
None => false,
}
}
/// List timelines of given tenant in remote storage
pub async fn list_remote_timelines<'a>(
storage: &'a GenericRemoteStorage,
@@ -234,27 +216,26 @@ pub async fn download_index_part(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<IndexPart, DownloadError> {
async {
let index_part_path = conf
.metadata_path(timeline_id, tenant_id)
.with_file_name(IndexPart::FILE_NAME);
let part_storage_path = storage
.remote_object_id(&index_part_path)
.with_context(|| {
format!(
"Failed to get the index part storage path for local path '{}'",
index_part_path.display()
)
})
.map_err(DownloadError::BadInput)?;
let index_part_path = conf
.metadata_path(timeline_id, tenant_id)
.with_file_name(IndexPart::FILE_NAME);
let part_storage_path = storage
.remote_object_id(&index_part_path)
.with_context(|| {
format!(
"Failed to get the index part storage path for local path '{}'",
index_part_path.display()
)
})
.map_err(DownloadError::BadInput)?;
let mut index_part_download = storage.download(&part_storage_path).await?;
let mut index_part_download = storage.download(&part_storage_path).await?;
let mut index_part_bytes = Vec::new();
tokio::io::copy(
&mut index_part_download.download_stream,
&mut index_part_bytes,
)
let mut index_part_bytes = Vec::new();
tokio::io::copy(
&mut index_part_download.download_stream,
&mut index_part_bytes,
)
.await
.with_context(|| {
format!(
@@ -264,22 +245,14 @@ pub async fn download_index_part(
})
.map_err(DownloadError::Other)?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| {
format!(
"Failed to deserialize index part file into file '{}'",
index_part_path.display()
)
})
.map_err(DownloadError::Other)?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| {
format!(
"Failed to deserialize index part file into file '{}'",
index_part_path.display()
)
})
.map_err(DownloadError::Other)?;
Ok(index_part)
}
.measure_remote_op(
tenant_id,
timeline_id,
RemoteOpFileKind::Index,
RemoteOpKind::Download,
)
.await
Ok(index_part)
}

View File

@@ -22,27 +22,24 @@ pub(super) async fn upload_index_part<'a>(
fail_point!("before-upload-index", |_| {
bail!("failpoint before-upload-index")
});
async {
let index_part_bytes = serde_json::to_vec(&index_part)
.context("Failed to serialize index part file into bytes")?;
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
let index_part_bytes = serde_json::to_vec(&index_part)
.context("Failed to serialize index part file into bytes")?;
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
let index_part_path = conf
.metadata_path(timeline_id, tenant_id)
.with_file_name(IndexPart::FILE_NAME);
storage
.upload_storage_object(
Box::new(index_part_bytes),
index_part_size,
&index_part_path,
)
.await
.with_context(|| {
format!("Failed to upload index part for '{tenant_id} / {timeline_id}'")
})
}
.await
let index_part_path = conf
.metadata_path(timeline_id, tenant_id)
.with_file_name(IndexPart::FILE_NAME);
storage
.upload_storage_object(
Box::new(index_part_bytes),
index_part_size,
&index_part_path,
)
.await
.with_context(|| {
format!("Failed to upload index part for '{tenant_id} / {timeline_id}'")
})
}
/// Attempts to upload given layer files.
@@ -57,54 +54,51 @@ pub(super) async fn upload_timeline_layer(
fail_point!("before-upload-layer", |_| {
bail!("failpoint before-upload-layer")
});
async {
let storage_path = storage.remote_object_id(source_path).with_context(|| {
let storage_path = storage.remote_object_id(source_path).with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
source_path.display()
)
})?;
let source_file = fs::File::open(&source_path).await.with_context(|| {
format!(
"Failed to open a source file for layer '{}'",
source_path.display()
)
})?;
let fs_size = source_file
.metadata()
.await
.with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
"Failed to get the source file metadata for layer '{}'",
source_path.display()
)
})?;
})?
.len();
let source_file = fs::File::open(&source_path).await.with_context(|| {
format!(
"Failed to open a source file for layer '{}'",
source_path.display()
)
})?;
let fs_size = source_file
.metadata()
.await
.with_context(|| {
format!(
"Failed to get the source file metadata for layer '{}'",
source_path.display()
)
})?
.len();
// FIXME: this looks bad
if let Some(metadata_size) = known_metadata.file_size() {
if metadata_size != fs_size {
bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
}
} else {
// this is a silly state we would like to avoid
// FIXME: this looks bad
if let Some(metadata_size) = known_metadata.file_size() {
if metadata_size != fs_size {
bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
}
let fs_size = usize::try_from(fs_size).with_context(|| format!("File {source_path:?} size {fs_size} could not be converted to usize"))?;
storage
.upload(Box::new(source_file), fs_size, &storage_path, None)
.await
.with_context(|| {
format!(
"Failed to upload a layer from local path '{}'",
source_path.display()
)
})?;
Ok(())
} else {
// this is a silly state we would like to avoid
}
.await
let fs_size = usize::try_from(fs_size).with_context(|| format!("File {source_path:?} size {fs_size} could not be converted to usize"))?;
storage
.upload(Box::new(source_file), fs_size, &storage_path, None)
.await
.with_context(|| {
format!(
"Failed to upload a layer from local path '{}'",
source_path.display()
)
})?;
Ok(())
}