Replicate S3 blob metadata in the remote storage

This commit is contained in:
Kirill Bulatov
2022-04-09 01:15:20 +03:00
committed by Kirill Bulatov
parent 0e9ee772af
commit 4f172e7612
4 changed files with 179 additions and 34 deletions

View File

@@ -325,27 +325,35 @@ trait RemoteStorage: Send + Sync {
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
to: &Self::StoragePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()>;
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
async fn download(
&self,
from: &Self::StoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()>;
) -> anyhow::Result<Option<StorageMetadata>>;
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
async fn download_range(
&self,
from: &Self::StoragePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()>;
) -> anyhow::Result<Option<StorageMetadata>>;
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()>;
}
/// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
/// Immutable, cannot be changed once the file is created.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StorageMetadata(HashMap<String, String>);
fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a Path> {
if prefix == path {
anyhow::bail!(

View File

@@ -5,7 +5,6 @@
//! volume is mounted to the local FS.
use std::{
ffi::OsString,
future::Future,
path::{Path, PathBuf},
pin::Pin,
@@ -18,7 +17,7 @@ use tokio::{
};
use tracing::*;
use super::{strip_path_prefix, RemoteStorage};
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
pub struct LocalFs {
pageserver_workdir: &'static Path,
@@ -54,6 +53,32 @@ impl LocalFs {
)
}
}
async fn read_storage_metadata(
&self,
file_path: &Path,
) -> anyhow::Result<Option<StorageMetadata>> {
let metadata_path = storage_metadata_path(&file_path);
if metadata_path.exists() && metadata_path.is_file() {
let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
format!(
"Failed to read metadata from the local storage at '{}'",
metadata_path.display()
)
})?;
serde_json::from_str(&metadata_string)
.with_context(|| {
format!(
"Failed to deserialize metadata from the local storage at '{}'",
metadata_path.display()
)
})
.map(|metadata| Some(StorageMetadata(metadata)))
} else {
Ok(None)
}
}
}
#[async_trait::async_trait]
@@ -81,19 +106,14 @@ impl RemoteStorage for LocalFs {
&self,
mut from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
to: &Self::StoragePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
let target_file_path = self.resolve_in_storage(to)?;
create_target_directory(&target_file_path).await?;
// We need this dance with sort of durable rename (without fsyncs)
// to prevent partial uploads. This was really hit when pageserver shutdown
// cancelled the upload and partial file was left on the fs
let mut temp_extension = target_file_path
.extension()
.unwrap_or_default()
.to_os_string();
temp_extension.push(OsString::from(".temp"));
let temp_file_path = target_file_path.with_extension(temp_extension);
let temp_file_path = path_with_suffix_extension(&target_file_path, ".temp");
let mut destination = io::BufWriter::new(
fs::OpenOptions::new()
.write(true)
@@ -132,6 +152,23 @@ impl RemoteStorage for LocalFs {
target_file_path.display()
)
})?;
if let Some(storage_metadata) = metadata {
let storage_metadata_path = storage_metadata_path(&target_file_path);
fs::write(
&storage_metadata_path,
serde_json::to_string(&storage_metadata.0)
.context("Failed to serialize storage metadata as json")?,
)
.await
.with_context(|| {
format!(
"Failed to write metadata to the local storage at '{}'",
storage_metadata_path.display()
)
})?;
}
Ok(())
}
@@ -139,7 +176,7 @@ impl RemoteStorage for LocalFs {
&self,
from: &Self::StoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<StorageMetadata>> {
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
@@ -162,7 +199,8 @@ impl RemoteStorage for LocalFs {
)
})?;
source.flush().await?;
Ok(())
self.read_storage_metadata(&file_path).await
} else {
bail!(
"File '{}' either does not exist or is not a file",
@@ -177,7 +215,7 @@ impl RemoteStorage for LocalFs {
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<StorageMetadata>> {
if let Some(end_exclusive) = end_exclusive {
ensure!(
end_exclusive > start_inclusive,
@@ -186,7 +224,7 @@ impl RemoteStorage for LocalFs {
end_exclusive
);
if start_inclusive == end_exclusive.saturating_sub(1) {
return Ok(());
return Ok(None);
}
}
let file_path = self.resolve_in_storage(from)?;
@@ -220,7 +258,8 @@ impl RemoteStorage for LocalFs {
file_path.display()
)
})?;
Ok(())
self.read_storage_metadata(&file_path).await
} else {
bail!(
"File '{}' either does not exist or is not a file",
@@ -242,6 +281,17 @@ impl RemoteStorage for LocalFs {
}
}
fn path_with_suffix_extension(original_path: &Path, suffix: &str) -> PathBuf {
let mut extension_with_suffix = original_path.extension().unwrap_or_default().to_os_string();
extension_with_suffix.push(suffix);
original_path.with_extension(extension_with_suffix)
}
fn storage_metadata_path(original_path: &Path) -> PathBuf {
path_with_suffix_extension(original_path, ".metadata")
}
fn get_all_files<'a, P>(
directory_path: P,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
@@ -451,7 +501,7 @@ mod fs_tests {
use super::*;
use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID};
use std::io::Write;
use std::{collections::HashMap, io::Write};
use tempfile::tempdir;
#[tokio::test]
@@ -465,7 +515,7 @@ mod fs_tests {
)
.await?;
let target_path = PathBuf::from("/").join("somewhere").join("else");
match storage.upload(source, &target_path).await {
match storage.upload(source, &target_path, None).await {
Ok(()) => panic!("Should not allow storing files with wrong target path"),
Err(e) => {
let message = format!("{:?}", e);
@@ -475,14 +525,14 @@ mod fs_tests {
}
assert!(storage.list().await?.is_empty());
let target_path_1 = upload_dummy_file(&repo_harness, &storage, "upload_1").await?;
let target_path_1 = upload_dummy_file(&repo_harness, &storage, "upload_1", None).await?;
assert_eq!(
storage.list().await?,
vec![target_path_1.clone()],
"Should list a single file after first upload"
);
let target_path_2 = upload_dummy_file(&repo_harness, &storage, "upload_2").await?;
let target_path_2 = upload_dummy_file(&repo_harness, &storage, "upload_2", None).await?;
assert_eq!(
list_files_sorted(&storage).await?,
vec![target_path_1.clone(), target_path_2.clone()],
@@ -503,12 +553,16 @@ mod fs_tests {
let repo_harness = RepoHarness::create("download_file")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?;
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
storage.download(&upload_target, &mut content_bytes).await?;
content_bytes.flush().await?;
let metadata = storage.download(&upload_target, &mut content_bytes).await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
assert_eq!(
dummy_contents(upload_name),
@@ -533,12 +587,16 @@ mod fs_tests {
let repo_harness = RepoHarness::create("download_file_range_positive")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?;
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?;
let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
storage
let metadata = storage
.download_range(&upload_target, 0, None, &mut full_range_bytes)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
full_range_bytes.flush().await?;
assert_eq!(
dummy_contents(upload_name),
@@ -548,7 +606,7 @@ mod fs_tests {
let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let same_byte = 1_000_000_000;
storage
let metadata = storage
.download_range(
&upload_target,
same_byte,
@@ -556,6 +614,10 @@ mod fs_tests {
&mut zero_range_bytes,
)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
zero_range_bytes.flush().await?;
assert!(
zero_range_bytes.into_inner().into_inner().is_empty(),
@@ -566,7 +628,7 @@ mod fs_tests {
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
storage
let metadata = storage
.download_range(
&upload_target,
0,
@@ -574,6 +636,11 @@ mod fs_tests {
&mut first_part_remote,
)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
@@ -583,7 +650,7 @@ mod fs_tests {
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
storage
let metadata = storage
.download_range(
&upload_target,
first_part_local.len() as u64,
@@ -591,6 +658,11 @@ mod fs_tests {
&mut second_part_remote,
)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
second_part_remote.flush().await?;
let second_part_remote = second_part_remote.into_inner().into_inner();
assert_eq!(
@@ -607,7 +679,7 @@ mod fs_tests {
let repo_harness = RepoHarness::create("download_file_range_negative")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?;
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?;
let start = 10000;
let end = 234;
@@ -645,7 +717,7 @@ mod fs_tests {
let repo_harness = RepoHarness::create("delete_file")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?;
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?;
storage.delete(&upload_target).await?;
assert!(storage.list().await?.is_empty());
@@ -661,10 +733,69 @@ mod fs_tests {
Ok(())
}
#[tokio::test]
async fn file_with_metadata() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("download_file")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let metadata = StorageMetadata(HashMap::from([
("one".to_string(), "1".to_string()),
("two".to_string(), "2".to_string()),
]));
let upload_target =
upload_dummy_file(&repo_harness, &storage, upload_name, Some(metadata.clone())).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?;
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
assert_eq!(
dummy_contents(upload_name),
contents,
"We should upload and download the same contents"
);
assert_eq!(
full_download_metadata.as_ref(),
Some(&metadata),
"We should get the same metadata back for full download"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, _) = uploaded_bytes.split_at(3);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let partial_download_metadata = storage
.download_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
first_part_local,
first_part_remote.as_slice(),
"First part bytes should be returned when requested"
);
assert_eq!(
partial_download_metadata.as_ref(),
Some(&metadata),
"We should get the same metadata back for partial download"
);
Ok(())
}
async fn upload_dummy_file(
harness: &RepoHarness<'_>,
storage: &LocalFs,
name: &str,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<PathBuf> {
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let relative_timeline_path = timeline_path.strip_prefix(&harness.conf.workdir)?;
@@ -677,6 +808,7 @@ mod fs_tests {
)
.await?,
&storage_path,
metadata,
)
.await?;
Ok(storage_path)

View File

@@ -24,6 +24,8 @@ use crate::{
remote_storage::{strip_path_prefix, RemoteStorage},
};
use super::StorageMetadata;
const S3_FILE_SEPARATOR: char = '/';
#[derive(Debug, Eq, PartialEq)]
@@ -179,12 +181,14 @@ impl RemoteStorage for S3Bucket {
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
to: &Self::StoragePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
self.client
.put_object(PutObjectRequest {
body: Some(StreamingBody::new(ReaderStream::new(from))),
bucket: self.bucket_name.clone(),
key: to.key().to_owned(),
metadata: metadata.map(|m| m.0),
..PutObjectRequest::default()
})
.await?;
@@ -195,7 +199,7 @@ impl RemoteStorage for S3Bucket {
&self,
from: &Self::StoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<StorageMetadata>> {
let object_output = self
.client
.get_object(GetObjectRequest {
@@ -210,7 +214,7 @@ impl RemoteStorage for S3Bucket {
io::copy(&mut from, to).await?;
}
Ok(())
Ok(object_output.metadata.map(StorageMetadata))
}
async fn download_range(
@@ -219,7 +223,7 @@ impl RemoteStorage for S3Bucket {
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<StorageMetadata>> {
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// and needs both ends to be exclusive
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
@@ -242,7 +246,7 @@ impl RemoteStorage for S3Bucket {
io::copy(&mut from, to).await?;
}
Ok(())
Ok(object_output.metadata.map(StorageMetadata))
}
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> {

View File

@@ -201,6 +201,7 @@ async fn try_upload_checkpoint<
.upload(
archive_streamer,
&remote_storage.storage_path(&timeline_dir.join(&archive_name))?,
None,
)
.await
},